23 static nmsg_input_t input_open_stream(nmsg_stream_type,
int);
24 static nmsg_input_t input_open_stream_base(nmsg_stream_type);
25 static nmsg_res input_flush(nmsg_input_t input);
26 static void input_close_stream(nmsg_input_t input);
32 return (input_open_stream(nmsg_stream_type_file, fd));
37 return (input_open_stream(nmsg_stream_type_sock, fd));
45 input = input_open_stream_base(nmsg_stream_type_xs);
49 input->stream->xs = s;
64 input = calloc(1,
sizeof(*input));
67 input->type = nmsg_input_type_callback;
68 input->read_fp = _input_nmsg_read_callback;
69 input->read_loop_fp = NULL;
70 input->callback = calloc(1,
sizeof(*(input->callback)));
71 if (input->callback == NULL) {
75 input->callback->cb = cb;
76 input->callback->user = user;
85 input = input_open_stream_base(nmsg_stream_type_null);
88 input->read_fp = _input_nmsg_read_null;
89 input->read_loop_fp = _input_nmsg_loop_null;
99 input = calloc(1,
sizeof(*input));
103 input->read_fp = _input_pres_read;
105 input->pres = calloc(1,
sizeof(*(input->pres)));
106 if (input->pres == NULL) {
111 input->pres->fp = fdopen(fd,
"r");
112 if (input->pres->fp == NULL) {
118 input->msgmod = msgmod;
121 fclose(input->pres->fp);
135 input = calloc(1,
sizeof(*input));
141 if (msgmod->plugin->pkt_to_payload != NULL) {
142 input->read_fp = _input_pcap_read_raw;
144 }
else if (msgmod->plugin->ipdg_to_payload != NULL) {
145 input->read_fp = _input_pcap_read;
151 input->msgmod = msgmod;
157 if (msgmod->plugin->pcap_init != NULL) {
158 void *clos = input->clos;
159 if (msgmod->plugin->type == nmsg_msgmod_type_transparent)
161 res = msgmod->plugin->pcap_init(clos, input->pcap);
173 switch ((*input)->type) {
175 _nmsg_brate_destroy(&((*input)->stream->brate));
177 if ((*input)->stream->type == nmsg_stream_type_xs)
178 xs_close((*input)->stream->xs);
180 assert((*input)->stream->type != nmsg_stream_type_xs);
182 input_close_stream(*input);
188 fclose((*input)->pres->fp);
189 free((*input)->pres);
191 case nmsg_input_type_callback:
192 free((*input)->callback);
196 if ((*input)->msgmod != NULL)
212 return (input->read_fp(input, msg));
221 if (input->read_loop_fp != NULL)
222 return (input->read_loop_fp(input, cnt, cb, user));
225 res = input->read_fp(input, &msg);
231 if (cnt >= 0 && n_payloads == cnt)
244 unsigned vid,
unsigned msgtype)
246 if (vid == 0 && msgtype == 0)
247 input->do_filter =
false;
249 input->do_filter =
true;
251 input->filter_vid = vid;
252 input->filter_msgtype = msgtype;
257 const char *vname,
const char *mname)
259 unsigned vid, msgtype;
261 if (vname == NULL || mname == NULL)
279 input->stream->source = source;
285 input->stream->operator =
operator;
291 input->stream->group = group;
301 if ((val = fcntl(input->stream->buf->fd, F_GETFL, 0)) < 0)
309 if (fcntl(input->stream->buf->fd, F_SETFL, val) < 0)
313 input->stream->blocking_io =
true;
315 input->stream->blocking_io =
false;
324 if (input->stream->brate != NULL)
325 _nmsg_brate_destroy(&input->stream->brate);
326 if (target_byte_rate > 0) {
327 input->stream->brate = _nmsg_brate_init(target_byte_rate);
328 if (input->stream->brate == NULL)
338 input->stream->verify_seqsrc = verify;
345 *count = input->stream->count_recv;
354 input->stream->verify_seqsrc)
356 *count = input->stream->count_drop;
365 input_open_stream(nmsg_stream_type type,
int fd) {
368 input = input_open_stream_base(type);
374 if (input->stream->buf == NULL) {
379 _nmsg_buf_reset(input->stream->buf);
380 input->stream->buf->fd = fd;
384 input->stream->pfd.fd = fd;
385 input->stream->pfd.events = POLLIN;
391 input_open_stream_base(nmsg_stream_type type) {
395 input = calloc(1,
sizeof(*input));
399 input->read_fp = _input_nmsg_read;
400 input->read_loop_fp = _input_nmsg_loop;
403 input->stream = calloc(1,
sizeof(*(input->stream)));
404 if (input->stream == NULL) {
408 input->stream->blocking_io =
true;
409 input->stream->verify_seqsrc =
true;
410 input->stream->type = type;
411 if (type == nmsg_stream_type_file) {
412 input->stream->stream_read_fp = _input_nmsg_read_container_file;
413 }
else if (type == nmsg_stream_type_sock) {
414 input->stream->stream_read_fp = _input_nmsg_read_container_sock;
415 }
else if (type == nmsg_stream_type_xs) {
417 input->stream->stream_read_fp = _input_nmsg_read_container_xs;
419 assert(type != nmsg_stream_type_xs);
425 if (input->stream->zb == NULL) {
426 _nmsg_buf_destroy(&input->stream->buf);
433 RB_INIT(&input->stream->nft.head);
436 ISC_LIST_INIT(input->stream->seqsrcs);
442 input_close_stream(nmsg_input_t input) {
443 _input_seqsrc_destroy(input);
445 if (input->stream->nmsg != NULL)
449 _input_frag_destroy(input->stream);
450 _nmsg_buf_destroy(&input->stream->buf);
455 input_flush(nmsg_input_t input) {
460 nmsg = input->stream->nmsg;
461 assert(nmsg != NULL);
463 for (i = 0; i < nmsg->n_payloads; i++)
464 if (nmsg->payloads[i] != NULL)
465 _nmsg_payload_free(&nmsg->payloads[i]);
466 nmsg->n_payloads = 0;
467 nmsg__nmsg__free_unpacked(nmsg, NULL);
void nmsg_zbuf_destroy(nmsg_zbuf_t *zb)
Destroy all resources associated with an nmsg_zbuf_t object.
nmsg_res nmsg_msgmod_init(nmsg_msgmod_t mod, void **clos)
Initialize a message module.
unsigned nmsg_msgmod_vname_to_vid(const char *vname)
Convert a human-readable vendor name to its numeric ID.
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
#define NMSG_RBUFSZ
Number of octets than an nmsg rbuf must hold.
nmsg_zbuf_t nmsg_zbuf_inflate_init(void)
Initialize an nmsg_zbuf_t object for inflation.
nmsg_res nmsg_msgmod_fini(nmsg_msgmod_t mod, void **clos)
Finalize a mesage module.
unsigned nmsg_msgmod_mname_to_msgtype(unsigned vid, const char *mname)
Convert the human-readable name of a message type to a message type ID.
nmsg_res(* nmsg_cb_message_read)(nmsg_message_t *msg, void *user)
Callback function for generating nmsg messages.