nmsg  0.9.0
input_frag.c
1 /*
2  * Copyright (c) 2009, 2011, 2012 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 #include "libmy/tree.h"
21 
22 /* Forward. */
23 
24 static nmsg_res reassemble_frags(nmsg_input_t, Nmsg__Nmsg **, struct nmsg_frag *);
25 
26 /* Red-black nmsg_frag glue. */
27 
28 static int
29 frag_cmp(struct nmsg_frag *e1, struct nmsg_frag *e2) {
30  return (memcmp(&e1->key, &e2->key, sizeof(struct nmsg_frag_key)));
31 }
32 
33 RB_PROTOTYPE(frag_ent, nmsg_frag, link, frag_cmp);
34 RB_GENERATE(frag_ent, nmsg_frag, link, frag_cmp);
35 
36 /* Convenience macros. */
37 
38 #define FRAG_INSERT(stream, fent) do { \
39  RB_INSERT(frag_ent, &((stream)->nft.head), fent); \
40 } while(0)
41 
42 #define FRAG_FIND(stream, fent, find) do { \
43  fent = RB_FIND(frag_ent, &((stream)->nft.head), find); \
44 } while(0)
45 
46 #define FRAG_REMOVE(stream, fent) do { \
47  RB_REMOVE(frag_ent, &((stream)->nft.head), fent); \
48 } while(0)
49 
50 #define FRAG_NEXT(stream, fent, fent_next) do { \
51  fent_next = RB_NEXT(frag_ent, &((stream)->nft.head), fent); \
52 } while(0)
53 
54 /* Internal functions. */
55 
57 _input_frag_read(nmsg_input_t input, Nmsg__Nmsg **nmsg, uint8_t *buf, size_t buf_len) {
58  Nmsg__NmsgFragment *nfrag;
59  nmsg_res res;
60  struct nmsg_frag *fent, find;
61 
62  res = nmsg_res_again;
63 
64  nfrag = nmsg__nmsg_fragment__unpack(NULL, buf_len, buf);
65  if (nfrag == NULL)
66  return (nmsg_res_parse_error);
67 
68  /* find the fragment, else allocate a node and insert into the tree */
69  memset(&find, 0, sizeof(find));
70  find.key.id = nfrag->id;
71  find.key.crc = nfrag->crc;
72  memcpy(&find.key.addr_ss, &input->stream->addr_ss, sizeof(input->stream->addr_ss));
73 
74  FRAG_FIND(input->stream, fent, &find);
75  if (fent == NULL) {
76  fent = calloc(1, sizeof(*fent));
77  if (fent == NULL) {
78  res = nmsg_res_memfail;
79  goto read_input_frag_out;
80  }
81  fent->key.id = nfrag->id;
82  fent->key.crc = nfrag->crc;
83  memcpy(&fent->key.addr_ss, &input->stream->addr_ss, sizeof(input->stream->addr_ss));
84  fent->last = nfrag->last;
85  fent->rem = nfrag->last + 1;
86  fent->ts = input->stream->now;
87  fent->frags = calloc(1, sizeof(ProtobufCBinaryData) *
88  (fent->last + 1));
89  if (fent->frags == NULL) {
90  free(fent);
91  res = nmsg_res_memfail;
92  goto read_input_frag_out;
93  }
94  FRAG_INSERT(input->stream, fent);
95  input->stream->nfrags += 1;
96  } else {
97  assert(fent->last == nfrag->last);
98  }
99 
100  if (fent->frags[nfrag->current].data != NULL) {
101  /* fragment has already been received, network problem? */
102  goto read_input_frag_out;
103  }
104 
105  /* attach the fragment payload to the tree node */
106  fent->frags[nfrag->current] = nfrag->fragment;
107 
108  /* decrement number of remaining fragments */
109  fent->rem -= 1;
110 
111  /* detach the fragment payload from the NmsgFragment */
112  nfrag->fragment.len = 0;
113  nfrag->fragment.data = NULL;
114 
115  /* reassemble if all the fragments have been gathered */
116  if (fent->rem == 0)
117  res = reassemble_frags(input, nmsg, fent);
118 
119 read_input_frag_out:
120  nmsg__nmsg_fragment__free_unpacked(nfrag, NULL);
121  return (res);
122 }
123 
124 void
125 _input_frag_destroy(struct nmsg_stream_input *stream) {
126  struct nmsg_frag *fent, *fent_next;
127  unsigned i;
128 
129  for (fent = RB_MIN(frag_ent, &(stream->nft.head));
130  fent != NULL;
131  fent = fent_next)
132  {
133  FRAG_NEXT(stream, fent, fent_next);
134  for (i = 0; i <= fent->last; i++)
135  free(fent->frags[i].data);
136  free(fent->frags);
137  FRAG_REMOVE(stream, fent);
138  free(fent);
139  }
140 }
141 
142 void
143 _input_frag_gc(struct nmsg_stream_input *stream) {
144  struct nmsg_frag *fent, *fent_next;
145  unsigned i;
146 
147  if (!(stream->nfrags > 0 &&
148  stream->now.tv_sec - stream->lastgc.tv_sec >= NMSG_FRAG_GC_INTERVAL))
149  {
150  return;
151  }
152 
153  for (fent = RB_MIN(frag_ent, &(stream->nft.head));
154  fent != NULL;
155  fent = fent_next)
156  {
157  FRAG_NEXT(stream, fent, fent_next);
158  if (stream->now.tv_sec - fent->ts.tv_sec >=
159  NMSG_FRAG_GC_INTERVAL)
160  {
161  FRAG_NEXT(stream, fent, fent_next);
162  for (i = 0; i <= fent->last; i++)
163  free(fent->frags[i].data);
164  free(fent->frags);
165  FRAG_REMOVE(stream, fent);
166  free(fent);
167  stream->nfrags -= 1;
168  }
169  }
170 
171  stream->lastgc = stream->now;
172 }
173 
174 /* Private functions. */
175 
176 static nmsg_res
177 reassemble_frags(nmsg_input_t input, Nmsg__Nmsg **nmsg, struct nmsg_frag *fent) {
178  nmsg_res res;
179  size_t len, padded_len;
180  uint8_t *payload, *ptr;
181  unsigned i;
182 
183  res = nmsg_res_again;
184 
185  /* obtain total length of reassembled payload */
186  len = 0;
187  for (i = 0; i <= fent->last; i++) {
188  assert(fent->frags[i].data != NULL);
189  len += fent->frags[i].len;
190  }
191 
192  /* round total length up to nearest kilobyte */
193  padded_len = len;
194  if (len % 1024 != 0)
195  padded_len += 1024 - (len % 1024);
196 
197  ptr = payload = malloc(padded_len);
198  if (payload == NULL) {
199  return (nmsg_res_memfail);
200  }
201 
202  /* copy into the payload buffer and deallocate frags */
203  for (i = 0; i <= fent->last; i++) {
204  memcpy(ptr, fent->frags[i].data, fent->frags[i].len);
205  free(fent->frags[i].data);
206  ptr += fent->frags[i].len;
207  }
208  free(fent->frags);
209 
210  /* decompress */
211  if (input->stream->flags & NMSG_FLAG_ZLIB) {
212  size_t u_len;
213  u_char *u_buf, *z_buf;
214 
215  z_buf = (u_char *) payload;
216  res = nmsg_zbuf_inflate(input->stream->zb, len, z_buf,
217  &u_len, &u_buf);
218  if (res != nmsg_res_success) {
219  free(payload);
220  goto reassemble_frags_out;
221  }
222  payload = u_buf;
223  len = u_len;
224  free(z_buf);
225  }
226 
227  /* unpack the defragmented payload */
228  *nmsg = nmsg__nmsg__unpack(NULL, len, payload);
229  if (nmsg != NULL)
230  res = nmsg_res_success;
231  else
232  res = nmsg_res_parse_error;
233  free(payload);
234 
235 reassemble_frags_out:
236  /* deallocate from tree */
237  input->stream->nfrags -= 1;
238  FRAG_REMOVE(input->stream, fent);
239  free(fent);
240 
241  return (res);
242 }
243 
nmsg_res
nmsg result code
Definition: res.h:25
success
Definition: res.h:26
out of memory
Definition: res.h:29
caller should try again
Definition: res.h:35
#define NMSG_FLAG_ZLIB
NMSG container is zlib compressed.
Definition: constants.h:109
unable to parse input
Definition: res.h:36
nmsg_res nmsg_zbuf_inflate(nmsg_zbuf_t zb, size_t z_len, u_char *z_buf, size_t *u_len, u_char **u_buf)
Inflate a buffer.
Definition: zbuf.c:120