nmsg  0.9.0
input_nmsg.c
1 /*
2  * Copyright (c) 2009-2013 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 /* Forward. */
22 
23 static nmsg_res read_file(nmsg_input_t, ssize_t *);
24 static nmsg_res do_read_file(nmsg_input_t, ssize_t, ssize_t);
25 static nmsg_res do_read_sock(nmsg_input_t, ssize_t);
26 
27 /* Internal functions. */
28 
30 _input_nmsg_read(nmsg_input_t input, nmsg_message_t *msg) {
31  Nmsg__NmsgPayload *np;
32  nmsg_res res;
33 
34  if (input->stream->nmsg != NULL &&
35  input->stream->np_index >= input->stream->nmsg->n_payloads - 1)
36  {
37  input->stream->nmsg->n_payloads = 0;
38  nmsg__nmsg__free_unpacked(input->stream->nmsg, NULL);
39  input->stream->nmsg = NULL;
40  } else {
41  input->stream->np_index += 1;
42  }
43 
44  if (input->stream->nmsg == NULL) {
45  res = input->stream->stream_read_fp(input, &input->stream->nmsg);
46  if (res != nmsg_res_success)
47  return (res);
48  input->stream->np_index = 0;
49  }
50 
51  /* detach the payload from the original nmsg container */
52  np = input->stream->nmsg->payloads[input->stream->np_index];
53  input->stream->nmsg->payloads[input->stream->np_index] = NULL;
54 
55  /* filter payload */
56  if (_input_nmsg_filter(input, input->stream->np_index, np) == false) {
57  _nmsg_payload_free(&np);
58  return (nmsg_res_again);
59  }
60 
61  /* pass a pointer to the payload to the caller */
62  *msg = _nmsg_message_from_payload(np);
63  if (msg == NULL)
64  return (nmsg_res_memfail);
65 
66  /* possibly sleep a bit if ingress rate control is enabled */
67  if (input->stream->brate != NULL)
68  _nmsg_brate_sleep(input->stream->brate,
69  input->stream->nc_size,
70  input->stream->nmsg->n_payloads,
71  input->stream->np_index);
72 
73  return (nmsg_res_success);
74 }
75 
77 _input_nmsg_loop(nmsg_input_t input, int cnt, nmsg_cb_message cb, void *user) {
78  unsigned n;
79  nmsg_res res;
80  Nmsg__Nmsg *nmsg;
81  Nmsg__NmsgPayload *np;
82  nmsg_message_t msg;
83 
84  if (cnt < 0) {
85  /* loop indefinitely */
86  for (;;) {
87  if (input->stop)
88  break;
89  res = input->stream->stream_read_fp(input, &input->stream->nmsg);
90  if (res == nmsg_res_again)
91  continue;
92  if (res != nmsg_res_success)
93  return (res);
94 
95  nmsg = input->stream->nmsg;
96  for (n = 0; n < nmsg->n_payloads; n++) {
97  np = nmsg->payloads[n];
98  if (_input_nmsg_filter(input, n, np)) {
99  msg = _nmsg_message_from_payload(np);
100  cb(msg, user);
101  }
102  }
103  nmsg->n_payloads = 0;
104  free(nmsg->payloads);
105  nmsg->payloads = NULL;
106  nmsg__nmsg__free_unpacked(nmsg, NULL);
107  input->stream->nmsg = NULL;
108  }
109  } else {
110  /* loop until (n_payloads == cnt) */
111  int n_payloads = 0;
112 
113  for (;;) {
114  if (input->stop)
115  break;
116  res = input->stream->stream_read_fp(input, &input->stream->nmsg);
117  if (res == nmsg_res_again)
118  continue;
119  if (res != nmsg_res_success)
120  return (res);
121 
122  nmsg = input->stream->nmsg;
123  for (n = 0; n < nmsg->n_payloads; n++) {
124  np = nmsg->payloads[n];
125  if (_input_nmsg_filter(input, n, np)) {
126  if (n_payloads == cnt)
127  break;
128  n_payloads += 1;
129  msg = _nmsg_message_from_payload(np);
130  cb(msg, user);
131  }
132  }
133  nmsg->n_payloads = 0;
134  free(nmsg->payloads);
135  nmsg->payloads = NULL;
136  nmsg__nmsg__free_unpacked(nmsg, NULL);
137  input->stream->nmsg = NULL;
138  if (n_payloads == cnt)
139  break;
140  }
141  }
142 
143  return (nmsg_res_success);
144 }
145 
146 bool
147 _input_nmsg_filter(nmsg_input_t input, unsigned idx, Nmsg__NmsgPayload *np) {
148  assert(input->stream->nmsg != NULL);
149 
150  /* payload crc */
151  if (input->stream->nmsg->n_payload_crcs >= (idx + 1)) {
152  uint32_t wire_crc = input->stream->nmsg->payload_crcs[idx];
153  uint32_t calc_crc = my_crc32c(np->payload.data, np->payload.len);
154  if (ntohl(wire_crc) != calc_crc) {
155  _nmsg_dprintf(1, "libnmsg: WARNING: crc mismatch (%x != %x) [%s]\n",
156  calc_crc, wire_crc, __func__);
157  return (false);
158  }
159  }
160 
161  /* (vid, msgtype) */
162  if (input->do_filter == true &&
163  (input->filter_vid != np->vid ||
164  input->filter_msgtype != np->msgtype))
165  {
166  return (false);
167  }
168 
169  /* source */
170  if (input->stream->source > 0 &&
171  input->stream->source != np->source)
172  {
173  return (false);
174  }
175 
176  /* operator */
177  if (input->stream->operator > 0 &&
178  input->stream->operator != np->operator_)
179  {
180  return (false);
181  }
182 
183  /* group */
184  if (input->stream->group > 0 &&
185  input->stream->group != np->group)
186  {
187  return (false);
188  }
189 
190  /* all passed */
191  return (true);
192 }
193 
194 nmsg_res
195 _input_nmsg_unpack_container(nmsg_input_t input, Nmsg__Nmsg **nmsg,
196  uint8_t *buf, size_t buf_len)
197 {
199 
200  input->stream->nc_size = buf_len + NMSG_HDRLSZ_V2;
201  _nmsg_dprintf(6, "%s: unpacking container len= %zd\n", __func__, buf_len);
202 
203  if (input->stream->flags & NMSG_FLAG_FRAGMENT) {
204  res = _input_frag_read(input, nmsg, buf, buf_len);
205  } else if (input->stream->flags & NMSG_FLAG_ZLIB) {
206  size_t u_len;
207  u_char *u_buf;
208 
209  res = nmsg_zbuf_inflate(input->stream->zb, buf_len, buf, &u_len, &u_buf);
210  if (res != nmsg_res_success)
211  return (res);
212  *nmsg = nmsg__nmsg__unpack(NULL, u_len, u_buf);
213  free(u_buf);
214  if (*nmsg == NULL)
215  return (nmsg_res_parse_error);
216  } else {
217  *nmsg = nmsg__nmsg__unpack(NULL, buf_len, buf);
218  if (*nmsg == NULL)
219  return (nmsg_res_parse_error);
220  }
221 
222  return (res);
223 }
224 
225 nmsg_res
226 _input_nmsg_unpack_container2(const uint8_t *buf, size_t buf_len,
227  unsigned flags, Nmsg__Nmsg **nmsg)
228 {
229  nmsg_res res;
230 
231  /* fragmented containers aren't handled by this function */
232  if (flags & NMSG_FLAG_FRAGMENT)
233  return (nmsg_res_failure);
234 
235  if (flags & NMSG_FLAG_ZLIB) {
236  size_t u_len;
237  u_char *u_buf;
238  nmsg_zbuf_t zb;
239 
240  zb = nmsg_zbuf_inflate_init();
241  if (zb == NULL)
242  return (nmsg_res_memfail);
243  res = nmsg_zbuf_inflate(zb, buf_len, (uint8_t *) buf, &u_len, &u_buf);
244  nmsg_zbuf_destroy(&zb);
245  if (res != nmsg_res_success)
246  return (res);
247  *nmsg = nmsg__nmsg__unpack(NULL, u_len, u_buf);
248  free(u_buf);
249  if (*nmsg == NULL)
250  return (nmsg_res_failure);
251  } else {
252  *nmsg = nmsg__nmsg__unpack(NULL, buf_len, buf);
253  if (*nmsg == NULL)
254  return (nmsg_res_failure);
255  }
256 
257  return (nmsg_res_success);
258 }
259 
260 nmsg_res
261 _input_nmsg_read_container_file(nmsg_input_t input, Nmsg__Nmsg **nmsg) {
262  nmsg_res res;
263  ssize_t bytes_avail, msgsize = 0;
264 
265  assert(input->stream->type == nmsg_stream_type_file);
266 
267  /* read */
268  res = read_file(input, &msgsize);
269  if (res != nmsg_res_success)
270  return (res);
271 
272  /* ensure that the full NMSG container is available */
273  bytes_avail = _nmsg_buf_avail(input->stream->buf);
274  if (bytes_avail < msgsize) {
275  ssize_t bytes_to_read = msgsize - bytes_avail;
276 
277  res = do_read_file(input, bytes_to_read, bytes_to_read);
278  if (res != nmsg_res_success)
279  return (res);
280  }
281 
282  /* unpack message */
283  res = _input_nmsg_unpack_container(input, nmsg, input->stream->buf->pos, msgsize);
284  input->stream->buf->pos += msgsize;
285 
286  return (res);
287 }
288 
289 nmsg_res
290 _input_nmsg_read_container_sock(nmsg_input_t input, Nmsg__Nmsg **nmsg) {
291  nmsg_res res;
292  ssize_t msgsize;
293  struct nmsg_buf *buf = input->stream->buf;
294 
295  assert(input->stream->type == nmsg_stream_type_sock);
296 
297  /* read the NMSG container */
298  _nmsg_buf_reset(buf);
299  res = do_read_sock(input, buf->bufsz);
300  if (res != nmsg_res_success) {
301  if (res == nmsg_res_read_failure)
302  return (res);
303  else
304  /* forward compatibility */
305  return (nmsg_res_again);
306  }
307  if (_nmsg_buf_avail(buf) < NMSG_HDRLSZ_V2)
308  return (nmsg_res_failure);
309 
310  /* deserialize the NMSG header */
311  res = _input_nmsg_deserialize_header(buf->pos,
312  _nmsg_buf_avail(buf),
313  &msgsize,
314  &input->stream->flags);
315  if (res != nmsg_res_success)
316  return (res);
317  buf->pos += NMSG_HDRLSZ_V2;
318 
319  /* since the input stream is a sock stream, the entire message must
320  * have been read by the call to do_read_sock() */
321  if (_nmsg_buf_avail(buf) != msgsize)
322  return (nmsg_res_parse_error);
323 
324  /* unpack message */
325  res = _input_nmsg_unpack_container(input, nmsg, buf->pos, msgsize);
326  input->stream->buf->pos += msgsize;
327 
328  /* update counters */
329  if (*nmsg != NULL) {
330  input->stream->count_recv += 1;
331 
332  if (input->stream->verify_seqsrc) {
333  struct nmsg_seqsrc *seqsrc;
334 
335  seqsrc = _input_seqsrc_get(input, *nmsg);
336  if (seqsrc != NULL) {
337  size_t drop;
338  drop = _input_seqsrc_update(input, seqsrc, *nmsg);
339  input->stream->count_drop += drop;
340  }
341  }
342  }
343 
344  /* expire old outstanding fragments */
345  _input_frag_gc(input->stream);
346 
347  return (res);
348 }
349 
350 #ifdef HAVE_LIBXS
351 nmsg_res
352 _input_nmsg_read_container_xs(nmsg_input_t input, Nmsg__Nmsg **nmsg) {
353  int ret;
354  nmsg_res res;
355  uint8_t *buf;
356  size_t buf_len;
357  ssize_t msgsize = 0;
358  xs_msg_t xmsg;
359  xs_pollitem_t xitems[1];
360 
361  /* poll */
362  xitems[0].socket = input->stream->xs;
363  xitems[0].events = XS_POLLIN;
364  ret = xs_poll(xitems, 1, NMSG_RBUF_TIMEOUT);
365  if (ret == 0 || (ret == -1 && errno == EINTR))
366  return (nmsg_res_again);
367  else if (ret == -1)
368  return (nmsg_res_read_failure);
369 
370  /* initialize XS message object */
371  if (xs_msg_init(&xmsg))
372  return (nmsg_res_failure);
373 
374  /* read the NMSG container */
375  if (xs_recvmsg(input->stream->xs, &xmsg, 0) == -1) {
376  res = nmsg_res_failure;
377  goto out;
378  }
379  nmsg_timespec_get(&input->stream->now);
380 
381  /* get buffer from the XS message */
382  buf = xs_msg_data(&xmsg);
383  buf_len = xs_msg_size(&xmsg);
384  if (buf_len < NMSG_HDRLSZ_V2) {
385  res = nmsg_res_failure;
386  goto out;
387  }
388 
389  /* deserialize the NMSG header */
390  res = _input_nmsg_deserialize_header(buf, buf_len, &msgsize, &input->stream->flags);
391  if (res != nmsg_res_success)
392  goto out;
393  buf += NMSG_HDRLSZ_V2;
394 
395  /* the entire message must have been read by xs_recvmsg() */
396  assert((size_t) msgsize == buf_len - NMSG_HDRLSZ_V2);
397 
398  /* unpack message */
399  res = _input_nmsg_unpack_container(input, nmsg, buf, msgsize);
400 
401  /* update seqsrc counts */
402  if (input->stream->verify_seqsrc && *nmsg != NULL) {
403  struct nmsg_seqsrc *seqsrc = _input_seqsrc_get(input, *nmsg);
404  if (seqsrc != NULL)
405  _input_seqsrc_update(input, seqsrc, *nmsg);
406  }
407 
408  /* expire old outstanding fragments */
409  _input_frag_gc(input->stream);
410 
411 out:
412  xs_msg_close(&xmsg);
413  return (res);
414 }
415 #endif /* HAVE_LIBXS */
416 
417 nmsg_res
418 _input_nmsg_deserialize_header(const uint8_t *buf, size_t buf_len,
419  ssize_t *msgsize, unsigned *flags)
420 {
421  static const char magic[] = NMSG_MAGIC;
422  uint16_t version;
423 
424  if (buf_len < NMSG_LENHDRSZ_V2)
425  return (nmsg_res_failure);
426 
427  /* check magic */
428  if (memcmp(buf, magic, sizeof(magic)) != 0)
429  return (nmsg_res_magic_mismatch);
430  buf += sizeof(magic);
431 
432  /* check version */
433  load_net16(buf, &version);
434  if ((version & 0xFF) != 2U)
435  return (nmsg_res_version_mismatch);
436  *flags = version >> 8;
437  buf += sizeof(version);
438 
439  /* load message (container) size */
440  load_net32(buf, msgsize);
441 
442  return (nmsg_res_success);
443 }
444 
445 
446 /* Private functions. */
447 
448 static nmsg_res
449 read_file(nmsg_input_t input, ssize_t *msgsize) {
450  static const char magic[] = NMSG_MAGIC;
451 
452  bool reset_buf = false;
453  ssize_t bytes_avail, bytes_needed, lenhdrsz;
455  uint16_t version;
456  struct nmsg_buf *buf = input->stream->buf;
457 
458  /* ensure we have the (magic, version) header fields */
459  bytes_avail = _nmsg_buf_avail(buf);
460  if (bytes_avail < NMSG_HDRSZ) {
461  assert(bytes_avail >= 0);
462  bytes_needed = NMSG_HDRSZ - bytes_avail;
463  if (bytes_avail == 0) {
464  _nmsg_buf_reset(buf);
465  res = do_read_file(input, bytes_needed, buf->bufsz);
466  } else {
467  /* the (magic, version) header fields were split */
468  res = do_read_file(input, bytes_needed, bytes_needed);
469  reset_buf = true;
470  }
471  if (res != nmsg_res_success)
472  return (res);
473  }
474  bytes_avail = _nmsg_buf_avail(buf);
475  assert(bytes_avail >= NMSG_HDRSZ);
476 
477  /* check magic */
478  if (memcmp(buf->pos, magic, sizeof(magic)) != 0)
479  return (nmsg_res_magic_mismatch);
480  buf->pos += sizeof(magic);
481 
482  /* check version */
483  load_net16(buf->pos, &version);
484  buf->pos += 2;
485  if (version == 1U) {
486  lenhdrsz = NMSG_LENHDRSZ_V1;
487  } else if ((version & 0xFF) == 2U) {
488  input->stream->flags = version >> 8;
489  version &= 0xFF;
490  lenhdrsz = NMSG_LENHDRSZ_V2;
491  } else {
493  goto read_header_out;
494  }
495 
496  /* if reset_buf was set, then reading the (magic, version) header
497  * required two read()s. at this point we've consumed all the split
498  * header data, so reset the buffer to avoid overflow.
499  */
500  if (reset_buf == true) {
501  _nmsg_buf_reset(buf);
502  reset_buf = false;
503  }
504 
505  /* ensure we have the length header field */
506  bytes_avail = _nmsg_buf_avail(buf);
507  if (bytes_avail < lenhdrsz) {
508  if (bytes_avail == 0)
509  _nmsg_buf_reset(buf);
510  bytes_needed = lenhdrsz - bytes_avail;
511  if (bytes_avail == 0) {
512  res = do_read_file(input, bytes_needed, buf->bufsz);
513  } else {
514  /* the length header field was split */
515  res = do_read_file(input, bytes_needed, bytes_needed);
516  reset_buf = true;
517  }
518  }
519  bytes_avail = _nmsg_buf_avail(buf);
520  assert(bytes_avail >= lenhdrsz);
521 
522  /* load message size */
523  if (version == 1U) {
524  load_net16(buf->pos, msgsize);
525  buf->pos += 2;
526  } else if (version == 2U) {
527  load_net32(buf->pos, msgsize);
528  buf->pos += 4;
529  }
530 
531  res = nmsg_res_success;
532 
533 read_header_out:
534  if (reset_buf == true)
535  _nmsg_buf_reset(buf);
536 
537  return (res);
538 }
539 
540 static nmsg_res
541 do_read_file(nmsg_input_t input, ssize_t bytes_needed, ssize_t bytes_max) {
542  ssize_t bytes_read;
543  struct nmsg_buf *buf = input->stream->buf;
544 
545  /* sanity check */
546  assert(bytes_needed <= bytes_max);
547 
548  /* check that we have enough buffer space */
549  assert((buf->end + bytes_max) <= (buf->data + NMSG_RBUFSZ));
550 
551  while (bytes_needed > 0) {
552  bytes_read = read(buf->fd, buf->end, bytes_max);
553  if (bytes_read < 0)
554  return (nmsg_res_failure);
555  if (bytes_read == 0)
556  return (nmsg_res_eof);
557  buf->end += bytes_read;
558  bytes_needed -= bytes_read;
559  bytes_max -= bytes_read;
560  }
561  nmsg_timespec_get(&input->stream->now);
562  return (nmsg_res_success);
563 }
564 
565 static nmsg_res
566 do_read_sock(nmsg_input_t input, ssize_t bytes_max) {
567  int ret;
568  ssize_t bytes_read;
569  struct nmsg_buf *buf = input->stream->buf;
570  socklen_t addr_len = sizeof(struct sockaddr_storage);
571 
572  /* check that we have enough buffer space */
573  assert((buf->end + bytes_max) <= (buf->data + NMSG_RBUFSZ));
574 
575  if (input->stream->blocking_io == true) {
576  /* poll */
577  ret = poll(&input->stream->pfd, 1, NMSG_RBUF_TIMEOUT);
578  if (ret == 0 || (ret == -1 && errno == EINTR))
579  return (nmsg_res_again);
580  else if (ret == -1)
581  return (nmsg_res_read_failure);
582  }
583 
584  /* read */
585  bytes_read = recvfrom(buf->fd, buf->pos, bytes_max, 0,
586  (struct sockaddr *) &input->stream->addr_ss, &addr_len);
587  nmsg_timespec_get(&input->stream->now);
588 
589  if (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
590  return (nmsg_res_again);
591  if (bytes_read < 0)
592  return (nmsg_res_read_failure);
593  if (bytes_read == 0)
594  return (nmsg_res_eof);
595  buf->end = buf->pos + bytes_read;
596 
597  return (nmsg_res_success);
598 }
void nmsg_zbuf_destroy(nmsg_zbuf_t *zb)
Destroy all resources associated with an nmsg_zbuf_t object.
Definition: zbuf.c:84
void nmsg_timespec_get(struct timespec *ts)
Get the current time.
Definition: timespec.c:24
nmsg_res
nmsg result code
Definition: res.h:25
success
Definition: res.h:26
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
Definition: nmsg.h:69
#define NMSG_RBUF_TIMEOUT
Number of milliseconds to wait for data on an nmsg socket before returning nmsg_res_again.
Definition: constants.h:92
nmsg header magic incorrect
Definition: res.h:30
out of memory
Definition: res.h:29
caller should try again
Definition: res.h:35
#define NMSG_MAGIC
Four-octet magic sequence seen at the beginning of a serialized NMSG.
Definition: constants.h:27
read failure
Definition: res.h:38
#define NMSG_RBUFSZ
Number of octets than an nmsg rbuf must hold.
Definition: constants.h:86
#define NMSG_HDRLSZ_V2
Number of octets in an NMSG header (magic + version + length).
Definition: constants.h:42
nmsg_zbuf_t nmsg_zbuf_inflate_init(void)
Initialize an nmsg_zbuf_t object for inflation.
Definition: zbuf.c:59
#define NMSG_LENHDRSZ_V2
Number of octets in the NMSG v2 header length field.
Definition: constants.h:52
#define NMSG_FLAG_ZLIB
NMSG container is zlib compressed.
Definition: constants.h:109
#define NMSG_HDRSZ
Number of octets in an NMSG header (magic + version).
Definition: constants.h:37
#define NMSG_LENHDRSZ_V1
Number of octets in the NMSG v1 header length field.
Definition: constants.h:47
generic failure
Definition: res.h:27
unable to parse input
Definition: res.h:36
nmsg header version incorrect
Definition: res.h:31
#define NMSG_FLAG_FRAGMENT
NMSG container is fragmented.
Definition: constants.h:114
end of file
Definition: res.h:28
nmsg_res nmsg_zbuf_inflate(nmsg_zbuf_t zb, size_t z_len, u_char *z_buf, size_t *u_len, u_char **u_buf)
Inflate a buffer.
Definition: zbuf.c:120