23 static nmsg_res read_file(nmsg_input_t, ssize_t *);
24 static nmsg_res do_read_file(nmsg_input_t, ssize_t, ssize_t);
25 static nmsg_res do_read_sock(nmsg_input_t, ssize_t);
30 _input_nmsg_read(nmsg_input_t input, nmsg_message_t *msg) {
31 Nmsg__NmsgPayload *np;
34 if (input->stream->nmsg != NULL &&
35 input->stream->np_index >= input->stream->nmsg->n_payloads - 1)
37 input->stream->nmsg->n_payloads = 0;
38 nmsg__nmsg__free_unpacked(input->stream->nmsg, NULL);
39 input->stream->nmsg = NULL;
41 input->stream->np_index += 1;
44 if (input->stream->nmsg == NULL) {
45 res = input->stream->stream_read_fp(input, &input->stream->nmsg);
48 input->stream->np_index = 0;
52 np = input->stream->nmsg->payloads[input->stream->np_index];
53 input->stream->nmsg->payloads[input->stream->np_index] = NULL;
56 if (_input_nmsg_filter(input, input->stream->np_index, np) ==
false) {
57 _nmsg_payload_free(&np);
62 *msg = _nmsg_message_from_payload(np);
67 if (input->stream->brate != NULL)
68 _nmsg_brate_sleep(input->stream->brate,
69 input->stream->nc_size,
70 input->stream->nmsg->n_payloads,
71 input->stream->np_index);
77 _input_nmsg_loop(nmsg_input_t input,
int cnt,
nmsg_cb_message cb,
void *user) {
81 Nmsg__NmsgPayload *np;
89 res = input->stream->stream_read_fp(input, &input->stream->nmsg);
95 nmsg = input->stream->nmsg;
96 for (n = 0; n < nmsg->n_payloads; n++) {
97 np = nmsg->payloads[n];
98 if (_input_nmsg_filter(input, n, np)) {
99 msg = _nmsg_message_from_payload(np);
103 nmsg->n_payloads = 0;
104 free(nmsg->payloads);
105 nmsg->payloads = NULL;
106 nmsg__nmsg__free_unpacked(nmsg, NULL);
107 input->stream->nmsg = NULL;
116 res = input->stream->stream_read_fp(input, &input->stream->nmsg);
122 nmsg = input->stream->nmsg;
123 for (n = 0; n < nmsg->n_payloads; n++) {
124 np = nmsg->payloads[n];
125 if (_input_nmsg_filter(input, n, np)) {
126 if (n_payloads == cnt)
129 msg = _nmsg_message_from_payload(np);
133 nmsg->n_payloads = 0;
134 free(nmsg->payloads);
135 nmsg->payloads = NULL;
136 nmsg__nmsg__free_unpacked(nmsg, NULL);
137 input->stream->nmsg = NULL;
138 if (n_payloads == cnt)
147 _input_nmsg_filter(nmsg_input_t input,
unsigned idx, Nmsg__NmsgPayload *np) {
148 assert(input->stream->nmsg != NULL);
151 if (input->stream->nmsg->n_payload_crcs >= (idx + 1)) {
152 uint32_t wire_crc = input->stream->nmsg->payload_crcs[idx];
153 uint32_t calc_crc = my_crc32c(np->payload.data, np->payload.len);
154 if (ntohl(wire_crc) != calc_crc) {
155 _nmsg_dprintf(1,
"libnmsg: WARNING: crc mismatch (%x != %x) [%s]\n",
156 calc_crc, wire_crc, __func__);
162 if (input->do_filter ==
true &&
163 (input->filter_vid != np->vid ||
164 input->filter_msgtype != np->msgtype))
170 if (input->stream->source > 0 &&
171 input->stream->source != np->source)
177 if (input->stream->operator > 0 &&
178 input->stream->operator != np->operator_)
184 if (input->stream->group > 0 &&
185 input->stream->group != np->group)
195 _input_nmsg_unpack_container(nmsg_input_t input, Nmsg__Nmsg **nmsg,
196 uint8_t *buf,
size_t buf_len)
201 _nmsg_dprintf(6,
"%s: unpacking container len= %zd\n", __func__, buf_len);
204 res = _input_frag_read(input, nmsg, buf, buf_len);
212 *nmsg = nmsg__nmsg__unpack(NULL, u_len, u_buf);
217 *nmsg = nmsg__nmsg__unpack(NULL, buf_len, buf);
226 _input_nmsg_unpack_container2(
const uint8_t *buf,
size_t buf_len,
227 unsigned flags, Nmsg__Nmsg **nmsg)
247 *nmsg = nmsg__nmsg__unpack(NULL, u_len, u_buf);
252 *nmsg = nmsg__nmsg__unpack(NULL, buf_len, buf);
261 _input_nmsg_read_container_file(nmsg_input_t input, Nmsg__Nmsg **nmsg) {
263 ssize_t bytes_avail, msgsize = 0;
265 assert(input->stream->type == nmsg_stream_type_file);
268 res = read_file(input, &msgsize);
273 bytes_avail = _nmsg_buf_avail(input->stream->buf);
274 if (bytes_avail < msgsize) {
275 ssize_t bytes_to_read = msgsize - bytes_avail;
277 res = do_read_file(input, bytes_to_read, bytes_to_read);
283 res = _input_nmsg_unpack_container(input, nmsg, input->stream->buf->pos, msgsize);
284 input->stream->buf->pos += msgsize;
290 _input_nmsg_read_container_sock(nmsg_input_t input, Nmsg__Nmsg **nmsg) {
293 struct nmsg_buf *buf = input->stream->buf;
295 assert(input->stream->type == nmsg_stream_type_sock);
298 _nmsg_buf_reset(buf);
299 res = do_read_sock(input, buf->bufsz);
307 if (_nmsg_buf_avail(buf) < NMSG_HDRLSZ_V2)
311 res = _input_nmsg_deserialize_header(buf->pos,
312 _nmsg_buf_avail(buf),
314 &input->stream->flags);
321 if (_nmsg_buf_avail(buf) != msgsize)
325 res = _input_nmsg_unpack_container(input, nmsg, buf->pos, msgsize);
326 input->stream->buf->pos += msgsize;
330 input->stream->count_recv += 1;
332 if (input->stream->verify_seqsrc) {
335 seqsrc = _input_seqsrc_get(input, *nmsg);
336 if (seqsrc != NULL) {
338 drop = _input_seqsrc_update(input, seqsrc, *nmsg);
339 input->stream->count_drop += drop;
345 _input_frag_gc(input->stream);
352 _input_nmsg_read_container_xs(nmsg_input_t input, Nmsg__Nmsg **nmsg) {
359 xs_pollitem_t xitems[1];
362 xitems[0].socket = input->stream->xs;
363 xitems[0].events = XS_POLLIN;
365 if (ret == 0 || (ret == -1 && errno == EINTR))
371 if (xs_msg_init(&xmsg))
375 if (xs_recvmsg(input->stream->xs, &xmsg, 0) == -1) {
382 buf = xs_msg_data(&xmsg);
383 buf_len = xs_msg_size(&xmsg);
384 if (buf_len < NMSG_HDRLSZ_V2) {
390 res = _input_nmsg_deserialize_header(buf, buf_len, &msgsize, &input->stream->flags);
396 assert((
size_t) msgsize == buf_len - NMSG_HDRLSZ_V2);
399 res = _input_nmsg_unpack_container(input, nmsg, buf, msgsize);
402 if (input->stream->verify_seqsrc && *nmsg != NULL) {
403 struct nmsg_seqsrc *seqsrc = _input_seqsrc_get(input, *nmsg);
405 _input_seqsrc_update(input, seqsrc, *nmsg);
409 _input_frag_gc(input->stream);
418 _input_nmsg_deserialize_header(
const uint8_t *buf,
size_t buf_len,
419 ssize_t *msgsize,
unsigned *flags)
428 if (memcmp(buf, magic,
sizeof(magic)) != 0)
430 buf +=
sizeof(magic);
433 load_net16(buf, &version);
434 if ((version & 0xFF) != 2U)
436 *flags = version >> 8;
437 buf +=
sizeof(version);
440 load_net32(buf, msgsize);
449 read_file(nmsg_input_t input, ssize_t *msgsize) {
452 bool reset_buf =
false;
453 ssize_t bytes_avail, bytes_needed, lenhdrsz;
456 struct nmsg_buf *buf = input->stream->buf;
459 bytes_avail = _nmsg_buf_avail(buf);
461 assert(bytes_avail >= 0);
463 if (bytes_avail == 0) {
464 _nmsg_buf_reset(buf);
465 res = do_read_file(input, bytes_needed, buf->bufsz);
468 res = do_read_file(input, bytes_needed, bytes_needed);
474 bytes_avail = _nmsg_buf_avail(buf);
478 if (memcmp(buf->pos, magic,
sizeof(magic)) != 0)
480 buf->pos +=
sizeof(magic);
483 load_net16(buf->pos, &version);
487 }
else if ((version & 0xFF) == 2U) {
488 input->stream->flags = version >> 8;
493 goto read_header_out;
500 if (reset_buf ==
true) {
501 _nmsg_buf_reset(buf);
506 bytes_avail = _nmsg_buf_avail(buf);
507 if (bytes_avail < lenhdrsz) {
508 if (bytes_avail == 0)
509 _nmsg_buf_reset(buf);
510 bytes_needed = lenhdrsz - bytes_avail;
511 if (bytes_avail == 0) {
512 res = do_read_file(input, bytes_needed, buf->bufsz);
515 res = do_read_file(input, bytes_needed, bytes_needed);
519 bytes_avail = _nmsg_buf_avail(buf);
520 assert(bytes_avail >= lenhdrsz);
524 load_net16(buf->pos, msgsize);
526 }
else if (version == 2U) {
527 load_net32(buf->pos, msgsize);
534 if (reset_buf ==
true)
535 _nmsg_buf_reset(buf);
541 do_read_file(nmsg_input_t input, ssize_t bytes_needed, ssize_t bytes_max) {
543 struct nmsg_buf *buf = input->stream->buf;
546 assert(bytes_needed <= bytes_max);
549 assert((buf->end + bytes_max) <= (buf->data +
NMSG_RBUFSZ));
551 while (bytes_needed > 0) {
552 bytes_read = read(buf->fd, buf->end, bytes_max);
557 buf->end += bytes_read;
558 bytes_needed -= bytes_read;
559 bytes_max -= bytes_read;
566 do_read_sock(nmsg_input_t input, ssize_t bytes_max) {
569 struct nmsg_buf *buf = input->stream->buf;
570 socklen_t addr_len =
sizeof(
struct sockaddr_storage);
573 assert((buf->end + bytes_max) <= (buf->data +
NMSG_RBUFSZ));
575 if (input->stream->blocking_io ==
true) {
578 if (ret == 0 || (ret == -1 && errno == EINTR))
585 bytes_read = recvfrom(buf->fd, buf->pos, bytes_max, 0,
586 (
struct sockaddr *) &input->stream->addr_ss, &addr_len);
589 if (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
595 buf->end = buf->pos + bytes_read;
void nmsg_zbuf_destroy(nmsg_zbuf_t *zb)
Destroy all resources associated with an nmsg_zbuf_t object.
void nmsg_timespec_get(struct timespec *ts)
Get the current time.
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
#define NMSG_RBUF_TIMEOUT
Number of milliseconds to wait for data on an nmsg socket before returning nmsg_res_again.
nmsg header magic incorrect
#define NMSG_MAGIC
Four-octet magic sequence seen at the beginning of a serialized NMSG.
#define NMSG_RBUFSZ
Number of octets than an nmsg rbuf must hold.
#define NMSG_HDRLSZ_V2
Number of octets in an NMSG header (magic + version + length).
nmsg_zbuf_t nmsg_zbuf_inflate_init(void)
Initialize an nmsg_zbuf_t object for inflation.
#define NMSG_LENHDRSZ_V2
Number of octets in the NMSG v2 header length field.
#define NMSG_FLAG_ZLIB
NMSG container is zlib compressed.
#define NMSG_HDRSZ
Number of octets in an NMSG header (magic + version).
#define NMSG_LENHDRSZ_V1
Number of octets in the NMSG v1 header length field.
nmsg header version incorrect
#define NMSG_FLAG_FRAGMENT
NMSG container is fragmented.
nmsg_res nmsg_zbuf_inflate(nmsg_zbuf_t zb, size_t z_len, u_char *z_buf, size_t *u_len, u_char **u_buf)
Inflate a buffer.