nmsg  0.9.0
private.h
1 /*
2  * Copyright (c) 2008-2014 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef NMSG_PRIVATE_H
18 #define NMSG_PRIVATE_H
19 
20 #include "nmsg_port_net.h"
21 
22 #ifdef HAVE_ENDIAN_H
23 # include <endian.h>
24 #else
25 # ifdef HAVE_SYS_ENDIAN_H
26 # include <sys/endian.h>
27 # endif
28 #endif
29 
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/time.h>
33 #include <sys/stat.h>
34 #include <assert.h>
35 #include <ctype.h>
36 #include <errno.h>
37 #include <fcntl.h>
38 #include <inttypes.h>
39 #include <limits.h>
40 #include <pthread.h>
41 #include <poll.h>
42 #include <signal.h>
43 #include <stdarg.h>
44 #include <stdbool.h>
45 #include <stddef.h>
46 #include <stdio.h>
47 #include <stdint.h>
48 #include <stdlib.h>
49 #include <string.h>
50 #include <strings.h>
51 #include <time.h>
52 #include <unistd.h>
53 
54 #include <zlib.h>
55 
56 #include <protobuf-c/protobuf-c.h>
57 
58 #ifdef HAVE_LIBXS
59 # include <xs/xs.h>
60 #endif /* HAVE_LIBXS */
61 
62 #include "nmsg.h"
63 #include "nmsg.pb-c.h"
64 
65 #include "msgmod_plugin.h"
66 #include "ipreasm.h"
67 
68 #include "libmy/crc32c.h"
69 #include "libmy/list.h"
70 #include "libmy/tree.h"
71 #include "libmy/ubuf.h"
72 
73 /* Macros. */
74 
75 #define STR(x) #x
76 #define XSTR(x) STR(x)
77 
78 #define NMSG_SEQSRC_GC_INTERVAL 120
79 #define NMSG_FRAG_GC_INTERVAL 30
80 #define NMSG_MSG_MODULE_PREFIX "nmsg_msg" XSTR(NMSG_MSGMOD_VERSION)
81 #define NMSG_NSEC_PER_SEC 1000000000
82 
83 #define _nmsg_dprintf(level, format, ...) \
84 do { \
85  if (_nmsg_global_debug >= (level)) \
86  fprintf(stderr, format, ##__VA_ARGS__); \
87 } while (0)
88 
89 #define _nmsg_dprintfv(var, level, format, ...) \
90 do { \
91  if ((var) >= (level)) \
92  fprintf(stderr, format, ##__VA_ARGS__); \
93 } while (0)
94 
95 /* Enums. */
96 
97 typedef enum {
98  nmsg_modtype_msgmod
99 } nmsg_modtype;
100 
101 typedef enum {
102  nmsg_stream_type_file,
103  nmsg_stream_type_sock,
104  nmsg_stream_type_xs,
105  nmsg_stream_type_null,
106 } nmsg_stream_type;
107 
108 /* Forward. */
109 
110 struct nmsg_brate;
111 struct nmsg_buf;
112 struct nmsg_container;
113 struct nmsg_dlmod;
114 struct nmsg_frag;
115 struct nmsg_frag_key;
116 struct nmsg_frag_tree;
117 struct nmsg_input;
118 struct nmsg_output;
119 struct nmsg_msgmod;
120 struct nmsg_msgmod_field;
121 struct nmsg_msgmod_clos;
122 struct nmsg_pcap;
123 struct nmsg_pres;
124 struct nmsg_stream_input;
125 struct nmsg_stream_output;
126 struct nmsg_seqsrc;
127 struct nmsg_seqsrc_key;
128 
129 /* Globals. */
130 
131 extern bool _nmsg_global_autoclose;
132 extern int _nmsg_global_debug;
133 extern struct nmsg_msgmodset * _nmsg_global_msgmodset;
134 
135 /* Function types. */
136 
137 typedef nmsg_res (*nmsg_input_read_fp)(struct nmsg_input *, nmsg_message_t *);
138 typedef nmsg_res (*nmsg_input_read_loop_fp)(struct nmsg_input *, int,
139  nmsg_cb_message, void *);
140 typedef nmsg_res (*nmsg_input_stream_read_fp)(struct nmsg_input *, Nmsg__Nmsg **);
141 typedef nmsg_res (*nmsg_output_write_fp)(struct nmsg_output *, nmsg_message_t);
142 typedef nmsg_res (*nmsg_output_flush_fp)(struct nmsg_output *);
143 
144 /* Data types. */
145 
146 /* nmsg_seqsrc */
148  uint64_t sequence_id;
149  sa_family_t af;
150  uint16_t port;
151  union {
152  uint8_t ip4[4];
153  uint8_t ip6[16];
154  };
155 };
156 
157 struct nmsg_seqsrc {
158  ISC_LINK(struct nmsg_seqsrc) link;
159  struct nmsg_seqsrc_key key;
160  uint32_t sequence;
161  uint64_t sequence_id;
162  uint64_t count;
163  uint64_t count_dropped;
164  time_t last;
165  bool init;
166  char addr_str[INET6_ADDRSTRLEN];
167 };
168 
169 /* nmsg_frag: used by nmsg_stream_input */
171  uint32_t id;
172  uint32_t crc;
173  struct sockaddr_storage addr_ss;
174 };
175 
176 struct nmsg_frag {
177  RB_ENTRY(nmsg_frag) link;
178  struct nmsg_frag_key key;
179  unsigned last;
180  unsigned rem;
181  struct timespec ts;
182  ProtobufCBinaryData *frags;
183 };
184 
185 /* nmsg_frag_tree: used by nmsg_stream_input */
187  RB_HEAD(frag_ent, nmsg_frag) head;
188 };
189 
190 /* nmsg_buf: used by nmsg_stream_input, nmsg_stream_output */
191 struct nmsg_buf {
192  int fd;
193  size_t bufsz;
194  u_char *data; /* allocated data starts here */
195  u_char *pos; /* position of next buffer read */
196  u_char *end; /* one byte beyond valid data */
197 };
198 
199 /* nmsg_pcap: used by nmsg_input */
200 struct nmsg_pcap {
201  int datalink;
202  pcap_t *handle;
203  struct _nmsg_ipreasm *reasm;
204  u_char *new_pkt;
205 
206  pcap_t *user;
207  char *userbpft;
208  struct bpf_program userbpf;
209 
210  nmsg_pcap_type type;
211  bool raw;
212 };
213 
214 /* nmsg_pres: used by nmsg_input and nmsg_output */
215 struct nmsg_pres {
216  pthread_mutex_t lock;
217  FILE *fp;
218  bool flush;
219  char *endline;
220 };
221 
222 /* nmsg_stream_input: used by nmsg_input */
224  nmsg_stream_type type;
225  struct nmsg_buf *buf;
226 #ifdef HAVE_LIBXS
227  void *xs;
228 #endif /* HAVE_LIBXS */
229  Nmsg__Nmsg *nmsg;
230  unsigned np_index;
231  size_t nc_size;
232  struct nmsg_frag_tree nft;
233  struct pollfd pfd;
234  struct timespec now;
235  struct timespec lastgc;
236  unsigned nfrags;
237  unsigned flags;
238  nmsg_zbuf_t zb;
239  u_char *zb_tmp;
240  unsigned source;
241  unsigned operator;
242  unsigned group;
243  bool blocking_io;
244  bool verify_seqsrc;
245  struct nmsg_brate *brate;
246  ISC_LIST(struct nmsg_seqsrc) seqsrcs;
247  struct sockaddr_storage addr_ss;
248  uint64_t count_recv;
249  uint64_t count_drop;
250 
251  nmsg_input_stream_read_fp stream_read_fp;
252 };
253 
254 /* nmsg_stream_output: used by nmsg_output */
256  pthread_mutex_t lock;
257  nmsg_stream_type type;
258  int fd;
259 #ifdef HAVE_LIBXS
260  void *xs;
261 #endif /* HAVE_LIBXS */
262  nmsg_container_t c;
263  size_t bufsz;
264  nmsg_random_t random;
265  nmsg_rate_t rate;
266  bool buffered;
267  unsigned source;
268  unsigned operator;
269  unsigned group;
270  bool do_zlib;
271  bool do_sequence;
272  uint32_t sequence;
273  uint64_t sequence_id;
274 };
275 
276 /* nmsg_callback_output: used by nmsg_output */
278  nmsg_cb_message cb;
279  void *user;
280 };
281 
282 /* nmsg_callback_input: used by nmsg_input */
285  void *user;
286 };
287 
288 /* nmsg_input */
289 struct nmsg_input {
290  nmsg_input_type type;
291  nmsg_msgmod_t msgmod;
292  void *clos;
293  union {
294  struct nmsg_stream_input *stream;
295  struct nmsg_pcap *pcap;
296  struct nmsg_pres *pres;
297  struct nmsg_callback_input *callback;
298  };
299  nmsg_input_read_fp read_fp;
300  nmsg_input_read_loop_fp read_loop_fp;
301 
302  bool do_filter;
303  unsigned filter_vid;
304  unsigned filter_msgtype;
305  volatile bool stop;
306 };
307 
308 /* nmsg_output */
309 struct nmsg_output {
310  nmsg_output_type type;
311  union {
312  struct nmsg_stream_output *stream;
313  struct nmsg_pres *pres;
314  struct nmsg_callback_output *callback;
315  };
316  nmsg_output_write_fp write_fp;
317  nmsg_output_flush_fp flush_fp;
318 
319  bool do_filter;
320  unsigned filter_vid;
321  unsigned filter_msgtype;
322  volatile bool stop;
323 };
324 
325 /* nmsg_message */
326 struct nmsg_message {
327  nmsg_msgmod_t mod;
328  ProtobufCMessage *message;
329  Nmsg__NmsgPayload *np;
330  void *msg_clos;
331  size_t n_allocs;
332  void **allocs;
333  bool updated;
334 };
335 
363 /* dlmod / msgmod / msgmodset */
364 
365 struct nmsg_dlmod {
366  ISC_LINK(struct nmsg_dlmod) link;
367  nmsg_modtype type;
368  char *path;
369  void *handle;
370 };
371 
372 typedef enum nmsg_msgmod_clos_mode {
373  nmsg_msgmod_clos_m_keyval,
374  nmsg_msgmod_clos_m_multiline
375 } nmsg_msgmod_clos_mode;
376 
378  char *nmsg_pbuf;
379  size_t estsz;
380  nmsg_msgmod_clos_mode mode;
381  struct nmsg_msgmod_field *field;
382  struct nmsg_strbuf *strbufs;
383  void *mod_clos;
384 };
385 
387  struct nmsg_msgmod **msgtypes;
388  char *vname;
389  size_t nm;
390 };
391 
392 struct nmsg_msgmod {
393  struct nmsg_msgmod_plugin *plugin;
394  struct nmsg_msgmod_field *fields;
395  size_t n_fields;
396 };
397 
399  ISC_LIST(struct nmsg_dlmod) dlmods;
400  struct nmsg_msgvendor **vendors;
401  size_t nv;
402 };
403 
404 /* Prototypes. */
405 
406 /* from alias.c */
407 
408 nmsg_res _nmsg_alias_init(void);
409 void _nmsg_alias_fini(void);
410 
411 /* from buf.c */
412 
413 ssize_t _nmsg_buf_avail(struct nmsg_buf *buf);
414 ssize_t _nmsg_buf_used(struct nmsg_buf *buf);
415 struct nmsg_buf * _nmsg_buf_new(size_t sz);
416 void _nmsg_buf_destroy(struct nmsg_buf **buf);
417 void _nmsg_buf_reset(struct nmsg_buf *buf);
418 
419 /* from dlmod.c */
420 
421 struct nmsg_dlmod * _nmsg_dlmod_init(const char *path);
422 void _nmsg_dlmod_destroy(struct nmsg_dlmod **dlmod);
423 
424 /* from msgmod.c */
425 
426 struct nmsg_msgmod * _nmsg_msgmod_start(struct nmsg_msgmod_plugin *plugin);
427 void _nmsg_msgmod_stop(struct nmsg_msgmod **mod);
428 
429 /* from message.c */
430 
431 nmsg_res _nmsg_message_init_message(struct nmsg_message *msg);
432 nmsg_res _nmsg_message_init_payload(struct nmsg_message *msg);
433 nmsg_res _nmsg_message_deserialize(struct nmsg_message *msg);
434 nmsg_res _nmsg_message_serialize(struct nmsg_message *msg);
435 nmsg_message_t _nmsg_message_from_payload(Nmsg__NmsgPayload *np);
436 nmsg_message_t _nmsg_message_dup(struct nmsg_message *msg);
437 nmsg_res _nmsg_message_dup_protobuf(const struct nmsg_message *msg, ProtobufCMessage **dst);
438 
439 /* from msgmodset.c */
440 
441 struct nmsg_msgmodset * _nmsg_msgmodset_init(const char *path);
442 void _nmsg_msgmodset_destroy(struct nmsg_msgmodset **);
443 
444 /* from payload.c */
445 void _nmsg_payload_free_all(Nmsg__Nmsg *nc);
446 void _nmsg_payload_calc_crcs(Nmsg__Nmsg *nc);
447 void _nmsg_payload_free(Nmsg__NmsgPayload **np);
448 size_t _nmsg_payload_size(const Nmsg__NmsgPayload *np);
449 
450 /* from input_frag.c */
451 nmsg_res _input_frag_read(nmsg_input_t, Nmsg__Nmsg **, uint8_t *buf, size_t buf_len);
452 void _input_frag_destroy(struct nmsg_stream_input *);
453 void _input_frag_gc(struct nmsg_stream_input *);
454 
455 /* from input_nmsg.c */
456 bool _input_nmsg_filter(nmsg_input_t, unsigned, Nmsg__NmsgPayload *);
457 nmsg_res _input_nmsg_read(nmsg_input_t, nmsg_message_t *);
458 nmsg_res _input_nmsg_loop(nmsg_input_t, int, nmsg_cb_message, void *);
459 nmsg_res _input_nmsg_unpack_container(nmsg_input_t, Nmsg__Nmsg **, uint8_t *, size_t);
460 nmsg_res _input_nmsg_unpack_container2(const uint8_t *, size_t, unsigned, Nmsg__Nmsg **);
461 nmsg_res _input_nmsg_read_container_file(nmsg_input_t, Nmsg__Nmsg **);
462 nmsg_res _input_nmsg_read_container_sock(nmsg_input_t, Nmsg__Nmsg **);
463 #ifdef HAVE_LIBXS
464 nmsg_res _input_nmsg_read_container_xs(nmsg_input_t, Nmsg__Nmsg **);
465 #endif /* HAVE_LIBXS */
466 nmsg_res _input_nmsg_deserialize_header(const uint8_t *, size_t, ssize_t *, unsigned *);
467 
468 /* from input_callback.c */
469 nmsg_res _input_nmsg_read_callback(nmsg_input_t, nmsg_message_t *);
470 
471 /* from input_nullnmsg.c */
472 nmsg_res _input_nmsg_read_null(nmsg_input_t, nmsg_message_t *);
473 nmsg_res _input_nmsg_loop_null(nmsg_input_t, int, nmsg_cb_message, void *);
474 
475 /* from input_pcap.c */
476 nmsg_res _input_pcap_read(nmsg_input_t, nmsg_message_t *);
477 nmsg_res _input_pcap_read_raw(nmsg_input_t, nmsg_message_t *);
478 
479 /* from input_pres.c */
480 nmsg_res _input_pres_read(nmsg_input_t, nmsg_message_t *);
481 
482 /* from input_seqsrc.c */
483 struct nmsg_seqsrc * _input_seqsrc_get(nmsg_input_t, Nmsg__Nmsg *);
484 void _input_seqsrc_destroy(nmsg_input_t);
485 size_t _input_seqsrc_update(nmsg_input_t, struct nmsg_seqsrc *, Nmsg__Nmsg *);
486 
487 /* from output.c */
488 void _output_stop(nmsg_output_t);
489 
490 /* from output_frag.c */
491 nmsg_res _output_frag_write(nmsg_output_t);
492 
493 /* from output_nmsg.c */
494 nmsg_res _output_nmsg_flush(nmsg_output_t);
495 nmsg_res _output_nmsg_write(nmsg_output_t, nmsg_message_t);
496 nmsg_res _output_nmsg_write_container(nmsg_output_t);
497 nmsg_res _output_nmsg_write_sock(nmsg_output_t, uint8_t *buf, size_t len);
498 nmsg_res _output_nmsg_write_file(nmsg_output_t, uint8_t *buf, size_t len);
499 #ifdef HAVE_LIBXS
500 nmsg_res _output_nmsg_write_xs(nmsg_output_t, uint8_t *buf, size_t len);
501 #endif /* HAVE_LIBXS */
502 
503 /* from output_pres.c */
504 nmsg_res _output_pres_write(nmsg_output_t, nmsg_message_t);
505 
506 /* from brate.c */
507 struct nmsg_brate * _nmsg_brate_init(size_t target_byte_rate);
508 void _nmsg_brate_destroy(struct nmsg_brate **);
509 void _nmsg_brate_sleep(struct nmsg_brate *, size_t container_sz, size_t n_payloads, size_t n);
510 
511 /* from ipdg.c */
512 
557 nmsg_res
558 _nmsg_ipdg_parse_reasm(struct nmsg_ipdg *dg, unsigned etype, size_t len,
559  const u_char *pkt, struct _nmsg_ipreasm *reasm,
560  unsigned *new_len, u_char *new_pkt, int *defrag,
561  uint64_t timestamp);
562 
563 #endif /* NMSG_PRIVATE_H */
nmsg_input_type
An enum identifying the underlying implementation of an nmsg_input_t object.
Definition: input.h:56
Structure exported by message modules to implement a new message type.
nmsg_res
nmsg result code
Definition: res.h:25
an nmsg_message MUST always have a non-NULL ->np member.
Definition: private.h:365
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
Definition: nmsg.h:69
Base nmsg support header.
Parsed IP datagram.
Definition: ipdg.h:34
Structure mapping protocol buffer schema fields to nmsg_msgmod_field_type values for "transparent" mo...
nmsg_output_type
An enum identifying the underlying implementation of an nmsg_output_t object.
Definition: output.h:40
String buffer.
Definition: strbuf.h:32
nmsg_res(* nmsg_cb_message_read)(nmsg_message_t *msg, void *user)
Callback function for generating nmsg messages.
Definition: nmsg.h:85