20 #include "libmy/tree.h"
24 static nmsg_res reassemble_frags(nmsg_input_t, Nmsg__Nmsg **,
struct nmsg_frag *);
30 return (memcmp(&e1->key, &e2->key,
sizeof(
struct nmsg_frag_key)));
33 RB_PROTOTYPE(frag_ent,
nmsg_frag, link, frag_cmp);
34 RB_GENERATE(frag_ent,
nmsg_frag, link, frag_cmp);
38 #define FRAG_INSERT(stream, fent) do { \
39 RB_INSERT(frag_ent, &((stream)->nft.head), fent); \
42 #define FRAG_FIND(stream, fent, find) do { \
43 fent = RB_FIND(frag_ent, &((stream)->nft.head), find); \
46 #define FRAG_REMOVE(stream, fent) do { \
47 RB_REMOVE(frag_ent, &((stream)->nft.head), fent); \
50 #define FRAG_NEXT(stream, fent, fent_next) do { \
51 fent_next = RB_NEXT(frag_ent, &((stream)->nft.head), fent); \
57 _input_frag_read(nmsg_input_t input, Nmsg__Nmsg **nmsg, uint8_t *buf,
size_t buf_len) {
58 Nmsg__NmsgFragment *nfrag;
64 nfrag = nmsg__nmsg_fragment__unpack(NULL, buf_len, buf);
69 memset(&find, 0,
sizeof(find));
70 find.key.id = nfrag->id;
71 find.key.crc = nfrag->crc;
72 memcpy(&find.key.addr_ss, &input->stream->addr_ss,
sizeof(input->stream->addr_ss));
74 FRAG_FIND(input->stream, fent, &find);
76 fent = calloc(1,
sizeof(*fent));
79 goto read_input_frag_out;
81 fent->key.id = nfrag->id;
82 fent->key.crc = nfrag->crc;
83 memcpy(&fent->key.addr_ss, &input->stream->addr_ss,
sizeof(input->stream->addr_ss));
84 fent->last = nfrag->last;
85 fent->rem = nfrag->last + 1;
86 fent->ts = input->stream->now;
87 fent->frags = calloc(1,
sizeof(ProtobufCBinaryData) *
89 if (fent->frags == NULL) {
92 goto read_input_frag_out;
94 FRAG_INSERT(input->stream, fent);
95 input->stream->nfrags += 1;
97 assert(fent->last == nfrag->last);
100 if (fent->frags[nfrag->current].data != NULL) {
102 goto read_input_frag_out;
106 fent->frags[nfrag->current] = nfrag->fragment;
112 nfrag->fragment.len = 0;
113 nfrag->fragment.data = NULL;
117 res = reassemble_frags(input, nmsg, fent);
120 nmsg__nmsg_fragment__free_unpacked(nfrag, NULL);
129 for (fent = RB_MIN(frag_ent, &(stream->nft.head));
133 FRAG_NEXT(stream, fent, fent_next);
134 for (i = 0; i <= fent->last; i++)
135 free(fent->frags[i].data);
137 FRAG_REMOVE(stream, fent);
147 if (!(stream->nfrags > 0 &&
148 stream->now.tv_sec - stream->lastgc.tv_sec >= NMSG_FRAG_GC_INTERVAL))
153 for (fent = RB_MIN(frag_ent, &(stream->nft.head));
157 FRAG_NEXT(stream, fent, fent_next);
158 if (stream->now.tv_sec - fent->ts.tv_sec >=
159 NMSG_FRAG_GC_INTERVAL)
161 FRAG_NEXT(stream, fent, fent_next);
162 for (i = 0; i <= fent->last; i++)
163 free(fent->frags[i].data);
165 FRAG_REMOVE(stream, fent);
171 stream->lastgc = stream->now;
177 reassemble_frags(nmsg_input_t input, Nmsg__Nmsg **nmsg,
struct nmsg_frag *fent) {
179 size_t len, padded_len;
180 uint8_t *payload, *ptr;
187 for (i = 0; i <= fent->last; i++) {
188 assert(fent->frags[i].data != NULL);
189 len += fent->frags[i].len;
195 padded_len += 1024 - (len % 1024);
197 ptr = payload = malloc(padded_len);
198 if (payload == NULL) {
203 for (i = 0; i <= fent->last; i++) {
204 memcpy(ptr, fent->frags[i].data, fent->frags[i].len);
205 free(fent->frags[i].data);
206 ptr += fent->frags[i].len;
213 u_char *u_buf, *z_buf;
215 z_buf = (u_char *) payload;
220 goto reassemble_frags_out;
228 *nmsg = nmsg__nmsg__unpack(NULL, len, payload);
235 reassemble_frags_out:
237 input->stream->nfrags -= 1;
238 FRAG_REMOVE(input->stream, fent);
#define NMSG_FLAG_ZLIB
NMSG container is zlib compressed.
nmsg_res nmsg_zbuf_inflate(nmsg_zbuf_t zb, size_t z_len, u_char *z_buf, size_t *u_len, u_char **u_buf)
Inflate a buffer.