17 #ifndef NMSG_PRIVATE_H
18 #define NMSG_PRIVATE_H
20 #include "nmsg_port_net.h"
25 # ifdef HAVE_SYS_ENDIAN_H
26 # include <sys/endian.h>
30 #include <sys/types.h>
31 #include <sys/socket.h>
56 #include <protobuf-c/protobuf-c.h>
63 #include "nmsg.pb-c.h"
65 #include "msgmod_plugin.h"
68 #include "libmy/crc32c.h"
69 #include "libmy/list.h"
70 #include "libmy/tree.h"
71 #include "libmy/ubuf.h"
76 #define XSTR(x) STR(x)
78 #define NMSG_SEQSRC_GC_INTERVAL 120
79 #define NMSG_FRAG_GC_INTERVAL 30
80 #define NMSG_MSG_MODULE_PREFIX "nmsg_msg" XSTR(NMSG_MSGMOD_VERSION)
81 #define NMSG_NSEC_PER_SEC 1000000000
83 #define _nmsg_dprintf(level, format, ...) \
85 if (_nmsg_global_debug >= (level)) \
86 fprintf(stderr, format, ##__VA_ARGS__); \
89 #define _nmsg_dprintfv(var, level, format, ...) \
91 if ((var) >= (level)) \
92 fprintf(stderr, format, ##__VA_ARGS__); \
102 nmsg_stream_type_file,
103 nmsg_stream_type_sock,
105 nmsg_stream_type_null,
112 struct nmsg_container;
131 extern bool _nmsg_global_autoclose;
132 extern int _nmsg_global_debug;
148 uint64_t sequence_id;
161 uint64_t sequence_id;
163 uint64_t count_dropped;
166 char addr_str[INET6_ADDRSTRLEN];
173 struct sockaddr_storage addr_ss;
182 ProtobufCBinaryData *frags;
203 struct _nmsg_ipreasm *reasm;
208 struct bpf_program userbpf;
216 pthread_mutex_t lock;
224 nmsg_stream_type type;
235 struct timespec lastgc;
245 struct nmsg_brate *brate;
247 struct sockaddr_storage addr_ss;
251 nmsg_input_stream_read_fp stream_read_fp;
256 pthread_mutex_t lock;
257 nmsg_stream_type type;
264 nmsg_random_t random;
273 uint64_t sequence_id;
291 nmsg_msgmod_t msgmod;
299 nmsg_input_read_fp read_fp;
300 nmsg_input_read_loop_fp read_loop_fp;
304 unsigned filter_msgtype;
316 nmsg_output_write_fp write_fp;
317 nmsg_output_flush_fp flush_fp;
321 unsigned filter_msgtype;
328 ProtobufCMessage *message;
329 Nmsg__NmsgPayload *np;
372 typedef enum nmsg_msgmod_clos_mode {
373 nmsg_msgmod_clos_m_keyval,
374 nmsg_msgmod_clos_m_multiline
375 } nmsg_msgmod_clos_mode;
380 nmsg_msgmod_clos_mode mode;
409 void _nmsg_alias_fini(
void);
413 ssize_t _nmsg_buf_avail(
struct nmsg_buf *buf);
414 ssize_t _nmsg_buf_used(
struct nmsg_buf *buf);
415 struct nmsg_buf * _nmsg_buf_new(
size_t sz);
416 void _nmsg_buf_destroy(
struct nmsg_buf **buf);
417 void _nmsg_buf_reset(
struct nmsg_buf *buf);
421 struct nmsg_dlmod * _nmsg_dlmod_init(
const char *path);
422 void _nmsg_dlmod_destroy(
struct nmsg_dlmod **dlmod);
435 nmsg_message_t _nmsg_message_from_payload(Nmsg__NmsgPayload *np);
436 nmsg_message_t _nmsg_message_dup(
struct nmsg_message *msg);
445 void _nmsg_payload_free_all(Nmsg__Nmsg *nc);
446 void _nmsg_payload_calc_crcs(Nmsg__Nmsg *nc);
447 void _nmsg_payload_free(Nmsg__NmsgPayload **np);
448 size_t _nmsg_payload_size(
const Nmsg__NmsgPayload *np);
451 nmsg_res _input_frag_read(nmsg_input_t, Nmsg__Nmsg **, uint8_t *buf,
size_t buf_len);
456 bool _input_nmsg_filter(nmsg_input_t,
unsigned, Nmsg__NmsgPayload *);
457 nmsg_res _input_nmsg_read(nmsg_input_t, nmsg_message_t *);
458 nmsg_res _input_nmsg_loop(nmsg_input_t,
int, nmsg_cb_message,
void *);
459 nmsg_res _input_nmsg_unpack_container(nmsg_input_t, Nmsg__Nmsg **, uint8_t *,
size_t);
460 nmsg_res _input_nmsg_unpack_container2(
const uint8_t *,
size_t,
unsigned, Nmsg__Nmsg **);
461 nmsg_res _input_nmsg_read_container_file(nmsg_input_t, Nmsg__Nmsg **);
462 nmsg_res _input_nmsg_read_container_sock(nmsg_input_t, Nmsg__Nmsg **);
464 nmsg_res _input_nmsg_read_container_xs(nmsg_input_t, Nmsg__Nmsg **);
466 nmsg_res _input_nmsg_deserialize_header(
const uint8_t *,
size_t, ssize_t *,
unsigned *);
469 nmsg_res _input_nmsg_read_callback(nmsg_input_t, nmsg_message_t *);
472 nmsg_res _input_nmsg_read_null(nmsg_input_t, nmsg_message_t *);
473 nmsg_res _input_nmsg_loop_null(nmsg_input_t,
int, nmsg_cb_message,
void *);
476 nmsg_res _input_pcap_read(nmsg_input_t, nmsg_message_t *);
477 nmsg_res _input_pcap_read_raw(nmsg_input_t, nmsg_message_t *);
480 nmsg_res _input_pres_read(nmsg_input_t, nmsg_message_t *);
483 struct nmsg_seqsrc * _input_seqsrc_get(nmsg_input_t, Nmsg__Nmsg *);
484 void _input_seqsrc_destroy(nmsg_input_t);
485 size_t _input_seqsrc_update(nmsg_input_t,
struct nmsg_seqsrc *, Nmsg__Nmsg *);
488 void _output_stop(nmsg_output_t);
491 nmsg_res _output_frag_write(nmsg_output_t);
494 nmsg_res _output_nmsg_flush(nmsg_output_t);
495 nmsg_res _output_nmsg_write(nmsg_output_t, nmsg_message_t);
496 nmsg_res _output_nmsg_write_container(nmsg_output_t);
497 nmsg_res _output_nmsg_write_sock(nmsg_output_t, uint8_t *buf,
size_t len);
498 nmsg_res _output_nmsg_write_file(nmsg_output_t, uint8_t *buf,
size_t len);
500 nmsg_res _output_nmsg_write_xs(nmsg_output_t, uint8_t *buf,
size_t len);
504 nmsg_res _output_pres_write(nmsg_output_t, nmsg_message_t);
507 struct nmsg_brate * _nmsg_brate_init(
size_t target_byte_rate);
508 void _nmsg_brate_destroy(
struct nmsg_brate **);
509 void _nmsg_brate_sleep(
struct nmsg_brate *,
size_t container_sz,
size_t n_payloads,
size_t n);
558 _nmsg_ipdg_parse_reasm(
struct nmsg_ipdg *dg,
unsigned etype,
size_t len,
559 const u_char *pkt,
struct _nmsg_ipreasm *reasm,
560 unsigned *new_len, u_char *new_pkt,
int *defrag,
Structure exported by message modules to implement a new message type.
an nmsg_message MUST always have a non-NULL ->np member.
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
Base nmsg support header.
Structure mapping protocol buffer schema fields to nmsg_msgmod_field_type values for "transparent" mo...
nmsg_output_type
An enum identifying the underlying implementation of an nmsg_output_t object.
nmsg_res(* nmsg_cb_message_read)(nmsg_message_t *msg, void *user)
Callback function for generating nmsg messages.