nmsg  0.9.0
input.c
1 /*
2  * Copyright (c) 2008-2013 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 /* Forward. */
22 
23 static nmsg_input_t input_open_stream(nmsg_stream_type, int);
24 static nmsg_input_t input_open_stream_base(nmsg_stream_type);
25 static nmsg_res input_flush(nmsg_input_t input);
26 static void input_close_stream(nmsg_input_t input);
27 
28 /* Export. */
29 
30 nmsg_input_t
32  return (input_open_stream(nmsg_stream_type_file, fd));
33 }
34 
35 nmsg_input_t
37  return (input_open_stream(nmsg_stream_type_sock, fd));
38 }
39 
40 #ifdef HAVE_LIBXS
41 nmsg_input_t
42 nmsg_input_open_xs(void *s) {
43  struct nmsg_input *input;
44 
45  input = input_open_stream_base(nmsg_stream_type_xs);
46  if (input == NULL)
47  return (input);
48 
49  input->stream->xs = s;
50 
51  return (input);
52 }
53 #else /* HAVE_LIBXS */
54 nmsg_input_t
55 nmsg_input_open_xs(void *s __attribute__((unused))) {
56  return (NULL);
57 }
58 #endif /* HAVE_LIBXS */
59 
60 nmsg_input_t
62  struct nmsg_input *input;
63 
64  input = calloc(1, sizeof(*input));
65  if (input == NULL)
66  return (NULL);
67  input->type = nmsg_input_type_callback;
68  input->read_fp = _input_nmsg_read_callback;
69  input->read_loop_fp = NULL;
70  input->callback = calloc(1, sizeof(*(input->callback)));
71  if (input->callback == NULL) {
72  free(input);
73  return (NULL);
74  }
75  input->callback->cb = cb;
76  input->callback->user = user;
77 
78  return (input);
79 }
80 
81 nmsg_input_t
83  struct nmsg_input *input;
84 
85  input = input_open_stream_base(nmsg_stream_type_null);
86  if (input == NULL)
87  return (NULL);
88  input->read_fp = _input_nmsg_read_null;
89  input->read_loop_fp = _input_nmsg_loop_null;
90 
91  return (input);
92 }
93 
94 nmsg_input_t
95 nmsg_input_open_pres(int fd, nmsg_msgmod_t msgmod) {
96  nmsg_res res;
97  struct nmsg_input *input;
98 
99  input = calloc(1, sizeof(*input));
100  if (input == NULL)
101  return (NULL);
102  input->type = nmsg_input_type_pres;
103  input->read_fp = _input_pres_read;
104 
105  input->pres = calloc(1, sizeof(*(input->pres)));
106  if (input->pres == NULL) {
107  free(input);
108  return (NULL);
109  }
110 
111  input->pres->fp = fdopen(fd, "r");
112  if (input->pres->fp == NULL) {
113  free(input->pres);
114  free(input);
115  return (NULL);
116  }
117 
118  input->msgmod = msgmod;
119  res = nmsg_msgmod_init(input->msgmod, &input->clos);
120  if (res != nmsg_res_success) {
121  fclose(input->pres->fp);
122  free(input->pres);
123  free(input);
124  return (NULL);
125  }
126 
127  return (input);
128 }
129 
130 nmsg_input_t
131 nmsg_input_open_pcap(nmsg_pcap_t pcap, nmsg_msgmod_t msgmod) {
132  nmsg_res res;
133  struct nmsg_input *input;
134 
135  input = calloc(1, sizeof(*input));
136  if (input == NULL)
137  return (NULL);
138  input->type = nmsg_input_type_pcap;
139  input->pcap = pcap;
140 
141  if (msgmod->plugin->pkt_to_payload != NULL) {
142  input->read_fp = _input_pcap_read_raw;
143  nmsg_pcap_input_set_raw(pcap, true);
144  } else if (msgmod->plugin->ipdg_to_payload != NULL) {
145  input->read_fp = _input_pcap_read;
146  } else {
147  free(input);
148  return (NULL);
149  }
150 
151  input->msgmod = msgmod;
152  res = nmsg_msgmod_init(input->msgmod, &input->clos);
153  if (res != nmsg_res_success) {
154  free(input);
155  return (NULL);
156  }
157  if (msgmod->plugin->pcap_init != NULL) {
158  void *clos = input->clos;
159  if (msgmod->plugin->type == nmsg_msgmod_type_transparent)
160  clos = ((struct nmsg_msgmod_clos *) clos)->mod_clos;
161  res = msgmod->plugin->pcap_init(clos, input->pcap);
162  if (res != nmsg_res_success) {
163  free(input);
164  return (NULL);
165  }
166  }
167 
168  return (input);
169 }
170 
171 nmsg_res
172 nmsg_input_close(nmsg_input_t *input) {
173  switch ((*input)->type) {
175  _nmsg_brate_destroy(&((*input)->stream->brate));
176 #ifdef HAVE_LIBXS
177  if ((*input)->stream->type == nmsg_stream_type_xs)
178  xs_close((*input)->stream->xs);
179 #else /* HAVE_LIBXS */
180  assert((*input)->stream->type != nmsg_stream_type_xs);
181 #endif /* HAVE_LIBXS */
182  input_close_stream(*input);
183  break;
185  nmsg_pcap_input_close(&(*input)->pcap);
186  break;
188  fclose((*input)->pres->fp);
189  free((*input)->pres);
190  break;
191  case nmsg_input_type_callback:
192  free((*input)->callback);
193  break;
194  }
195 
196  if ((*input)->msgmod != NULL)
197  nmsg_msgmod_fini((*input)->msgmod, &(*input)->clos);
198 
199  free(*input);
200  *input = NULL;
201 
202  return (nmsg_res_success);
203 }
204 
205 void
206 nmsg_input_breakloop(nmsg_input_t input) {
207  input->stop = true;
208 }
209 
210 nmsg_res
211 nmsg_input_read(nmsg_input_t input, nmsg_message_t *msg) {
212  return (input->read_fp(input, msg));
213 }
214 
215 nmsg_res
216 nmsg_input_loop(nmsg_input_t input, int cnt, nmsg_cb_message cb, void *user) {
217  int n_payloads = 0;
218  nmsg_message_t msg;
219  nmsg_res res;
220 
221  if (input->read_loop_fp != NULL)
222  return (input->read_loop_fp(input, cnt, cb, user));
223 
224  for (;;) {
225  res = input->read_fp(input, &msg);
226  if (res == nmsg_res_again)
227  continue;
228  if (res != nmsg_res_success)
229  return (res);
230 
231  if (cnt >= 0 && n_payloads == cnt)
232  break;
233  if (input->stop)
234  break;
235  n_payloads += 1;
236  cb(msg, user);
237  }
238 
239  return (nmsg_res_success);
240 }
241 
242 void
243 nmsg_input_set_filter_msgtype(nmsg_input_t input,
244  unsigned vid, unsigned msgtype)
245 {
246  if (vid == 0 && msgtype == 0)
247  input->do_filter = false;
248  else
249  input->do_filter = true;
250 
251  input->filter_vid = vid;
252  input->filter_msgtype = msgtype;
253 }
254 
255 nmsg_res
257  const char *vname, const char *mname)
258 {
259  unsigned vid, msgtype;
260 
261  if (vname == NULL || mname == NULL)
262  return (nmsg_res_failure);
263 
264  vid = nmsg_msgmod_vname_to_vid(vname);
265  if (vid == 0)
266  return (nmsg_res_failure);
267  msgtype = nmsg_msgmod_mname_to_msgtype(vid, mname);
268  if (msgtype == 0)
269  return (nmsg_res_failure);
270 
271  nmsg_input_set_filter_msgtype(input, vid, msgtype);
272 
273  return (nmsg_res_success);
274 }
275 
276 void
277 nmsg_input_set_filter_source(nmsg_input_t input, unsigned source) {
278  if (input->type == nmsg_input_type_stream)
279  input->stream->source = source;
280 }
281 
282 void
283 nmsg_input_set_filter_operator(nmsg_input_t input, unsigned operator) {
284  if (input->type == nmsg_input_type_stream)
285  input->stream->operator = operator;
286 }
287 
288 void
289 nmsg_input_set_filter_group(nmsg_input_t input, unsigned group) {
290  if (input->type == nmsg_input_type_stream)
291  input->stream->group = group;
292 }
293 
294 nmsg_res
295 nmsg_input_set_blocking_io(nmsg_input_t input, bool flag) {
296  int val;
297 
298  if (input->type != nmsg_input_type_stream)
299  return (nmsg_res_failure);
300 
301  if ((val = fcntl(input->stream->buf->fd, F_GETFL, 0)) < 0)
302  return (nmsg_res_failure);
303 
304  if (flag == true)
305  val &= ~O_NONBLOCK;
306  else
307  val |= O_NONBLOCK;
308 
309  if (fcntl(input->stream->buf->fd, F_SETFL, val) < 0)
310  return (nmsg_res_failure);
311 
312  if (flag == true)
313  input->stream->blocking_io = true;
314  else
315  input->stream->blocking_io = false;
316 
317  return (nmsg_res_success);
318 }
319 
320 nmsg_res
321 nmsg_input_set_byte_rate(nmsg_input_t input, size_t target_byte_rate) {
322  if (input->type != nmsg_input_type_stream)
323  return (nmsg_res_failure);
324  if (input->stream->brate != NULL)
325  _nmsg_brate_destroy(&input->stream->brate);
326  if (target_byte_rate > 0) {
327  input->stream->brate = _nmsg_brate_init(target_byte_rate);
328  if (input->stream->brate == NULL)
329  return (nmsg_res_failure);
330  }
331  return (nmsg_res_success);
332 }
333 
334 nmsg_res
335 nmsg_input_set_verify_seqsrc(nmsg_input_t input, bool verify) {
336  if (input->type != nmsg_input_type_stream)
337  return (nmsg_res_failure);
338  input->stream->verify_seqsrc = verify;
339  return (nmsg_res_success);
340 }
341 
342 nmsg_res
343 nmsg_input_get_count_container_received(nmsg_input_t input, uint64_t *count) {
344  if (input->type == nmsg_input_type_stream) {
345  *count = input->stream->count_recv;
346  return (nmsg_res_success);
347  }
348  return (nmsg_res_failure);
349 }
350 
351 nmsg_res
352 nmsg_input_get_count_container_dropped(nmsg_input_t input, uint64_t *count) {
353  if (input->type == nmsg_input_type_stream &&
354  input->stream->verify_seqsrc)
355  {
356  *count = input->stream->count_drop;
357  return (nmsg_res_success);
358  }
359  return (nmsg_res_failure);
360 }
361 
362 /* Private functions. */
363 
364 static nmsg_input_t
365 input_open_stream(nmsg_stream_type type, int fd) {
366  struct nmsg_input *input;
367 
368  input = input_open_stream_base(type);
369  if (input == NULL)
370  return (input);
371 
372  /* nmsg_buf */
373  input->stream->buf = _nmsg_buf_new(NMSG_RBUFSZ);
374  if (input->stream->buf == NULL) {
375  free(input->stream);
376  free(input);
377  return (NULL);
378  }
379  _nmsg_buf_reset(input->stream->buf);
380  input->stream->buf->fd = fd;
381  input->stream->buf->bufsz = NMSG_RBUFSZ / 2;
382 
383  /* struct pollfd */
384  input->stream->pfd.fd = fd;
385  input->stream->pfd.events = POLLIN;
386 
387  return (input);
388 }
389 
390 static nmsg_input_t
391 input_open_stream_base(nmsg_stream_type type) {
392  struct nmsg_input *input;
393 
394  /* nmsg_input */
395  input = calloc(1, sizeof(*input));
396  if (input == NULL)
397  return (NULL);
398  input->type = nmsg_input_type_stream;
399  input->read_fp = _input_nmsg_read;
400  input->read_loop_fp = _input_nmsg_loop;
401 
402  /* nmsg_stream_input */
403  input->stream = calloc(1, sizeof(*(input->stream)));
404  if (input->stream == NULL) {
405  free(input);
406  return (NULL);
407  }
408  input->stream->blocking_io = true;
409  input->stream->verify_seqsrc = true;
410  input->stream->type = type;
411  if (type == nmsg_stream_type_file) {
412  input->stream->stream_read_fp = _input_nmsg_read_container_file;
413  } else if (type == nmsg_stream_type_sock) {
414  input->stream->stream_read_fp = _input_nmsg_read_container_sock;
415  } else if (type == nmsg_stream_type_xs) {
416 #ifdef HAVE_LIBXS
417  input->stream->stream_read_fp = _input_nmsg_read_container_xs;
418 #else /* HAVE_LIBXS */
419  assert(type != nmsg_stream_type_xs);
420 #endif /* HAVE_LIBXS */
421  }
422 
423  /* nmsg_zbuf */
424  input->stream->zb = nmsg_zbuf_inflate_init();
425  if (input->stream->zb == NULL) {
426  _nmsg_buf_destroy(&input->stream->buf);
427  free(input->stream);
428  free(input);
429  return (NULL);
430  }
431 
432  /* red-black tree */
433  RB_INIT(&input->stream->nft.head);
434 
435  /* nmsg seqsrc */
436  ISC_LIST_INIT(input->stream->seqsrcs);
437 
438  return (input);
439 }
440 
441 static void
442 input_close_stream(nmsg_input_t input) {
443  _input_seqsrc_destroy(input);
444 
445  if (input->stream->nmsg != NULL)
446  input_flush(input);
447 
448  nmsg_zbuf_destroy(&input->stream->zb);
449  _input_frag_destroy(input->stream);
450  _nmsg_buf_destroy(&input->stream->buf);
451  free(input->stream);
452 }
453 
454 static nmsg_res
455 input_flush(nmsg_input_t input) {
456  if (input->type == nmsg_input_type_stream) {
457  Nmsg__Nmsg *nmsg;
458  unsigned i;
459 
460  nmsg = input->stream->nmsg;
461  assert(nmsg != NULL);
462 
463  for (i = 0; i < nmsg->n_payloads; i++)
464  if (nmsg->payloads[i] != NULL)
465  _nmsg_payload_free(&nmsg->payloads[i]);
466  nmsg->n_payloads = 0;
467  nmsg__nmsg__free_unpacked(nmsg, NULL);
468  }
469 
470  return (nmsg_res_success);
471 }
void nmsg_zbuf_destroy(nmsg_zbuf_t *zb)
Destroy all resources associated with an nmsg_zbuf_t object.
Definition: zbuf.c:84
presentation form
Definition: input.h:59
void nmsg_input_set_filter_operator(nmsg_input_t input, unsigned operator_)
Set an operator filter for input NMSG payloads.
Definition: input.c:283
nmsg_res
nmsg result code
Definition: res.h:25
success
Definition: res.h:26
nmsg_res nmsg_msgmod_init(nmsg_msgmod_t mod, void **clos)
Initialize a message module.
nmsg_res nmsg_input_set_verify_seqsrc(nmsg_input_t input, bool verify)
Enable or disable seqsrc verification on an NMSG stream nmsg_input_t object.
Definition: input.c:335
unsigned nmsg_msgmod_vname_to_vid(const char *vname)
Convert a human-readable vendor name to its numeric ID.
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
Definition: nmsg.h:69
caller should try again
Definition: res.h:35
nmsg_res nmsg_input_set_byte_rate(nmsg_input_t input, size_t rate)
Set the target ingress byte rate for a stream input.
Definition: input.c:321
nmsg_res nmsg_input_set_filter_msgtype_byname(nmsg_input_t input, const char *vname, const char *mname)
Filter an nmsg_input_t for a given vendor ID / message type.
Definition: input.c:256
#define NMSG_RBUFSZ
Number of octets than an nmsg rbuf must hold.
Definition: constants.h:86
nmsg_res nmsg_input_read(nmsg_input_t input, nmsg_message_t *msg)
Read one NMSG message from an input stream.
Definition: input.c:211
nmsg_res nmsg_input_set_blocking_io(nmsg_input_t input, bool flag)
Configure non-blocking I/O for a stream input.
Definition: input.c:295
nmsg_zbuf_t nmsg_zbuf_inflate_init(void)
Initialize an nmsg_zbuf_t object for inflation.
Definition: zbuf.c:59
void nmsg_pcap_input_set_raw(nmsg_pcap_t pcap, bool raw)
Set raw mode.
Definition: pcap_input.c:127
nmsg_input_t nmsg_input_open_null(void)
Initialize a new "null source" NMSG stream input.
Definition: input.c:82
void nmsg_input_set_filter_group(nmsg_input_t input, unsigned group)
Set a group filter for input NMSG payloads.
Definition: input.c:289
pcap packets from file or interface
Definition: input.h:58
nmsg_input_t nmsg_input_open_pcap(nmsg_pcap_t pcap, nmsg_msgmod_t msgmod)
Initialize a new NMSG pcap input from a pcap descriptor.
Definition: input.c:131
nmsg_res nmsg_input_loop(nmsg_input_t input, int count, nmsg_cb_message cb, void *user)
Loop over an input stream and call a user-provided function for each payload.
Definition: input.c:216
nmsg_res nmsg_input_get_count_container_received(nmsg_input_t input, uint64_t *count)
For UDP datagram socket nmsg_input_t objects, retrieve the total number of NMSG containers that have ...
Definition: input.c:343
nmsg_input_t nmsg_input_open_xs(void *s)
Initialize a new NMSG stream input from an XS socket source.
nmsg_input_t nmsg_input_open_callback(nmsg_cb_message_read cb, void *user)
Initialize a new nmsg input closure.
Definition: input.c:61
nmsg_res nmsg_input_get_count_container_dropped(nmsg_input_t input, uint64_t *count)
For UDP datagram socket nmsg_input_t objects, retrieve the total number of NMSG containers that been ...
Definition: input.c:352
nmsg_res nmsg_msgmod_fini(nmsg_msgmod_t mod, void **clos)
Finalize a mesage module.
generic failure
Definition: res.h:27
void nmsg_input_set_filter_msgtype(nmsg_input_t input, unsigned vid, unsigned msgtype)
Filter an nmsg_input_t for a given vendor ID / message type.
Definition: input.c:243
nmsg_input_t nmsg_input_open_sock(int fd)
Initialize a new NMSG stream input from a datagram socket source.
Definition: input.c:36
nmsg_input_t nmsg_input_open_pres(int fd, nmsg_msgmod_t msgmod)
Initialize a new NMSG presentation form input from a file descriptor.
Definition: input.c:95
nmsg_res nmsg_input_close(nmsg_input_t *input)
Close an nmsg_input_t object and release all associated resources.
Definition: input.c:172
void nmsg_input_breakloop(nmsg_input_t input)
Break out of an nmsg_input_loop() early.
Definition: input.c:206
void nmsg_input_set_filter_source(nmsg_input_t input, unsigned source)
Set a source filter for input NMSG payloads.
Definition: input.c:277
unsigned nmsg_msgmod_mname_to_msgtype(unsigned vid, const char *mname)
Convert the human-readable name of a message type to a message type ID.
nmsg_res(* nmsg_cb_message_read)(nmsg_message_t *msg, void *user)
Callback function for generating nmsg messages.
Definition: nmsg.h:85
nmsg_input_t nmsg_input_open_file(int fd)
Initialize a new NMSG stream input from a byte-stream file source.
Definition: input.c:31
nmsg_res nmsg_pcap_input_close(nmsg_pcap_t *pcap)
Close an nmsg_pcap_t object and release all associated resources.
Definition: pcap_input.c:53
NMSG payloads from file or socket.
Definition: input.h:57