nmsg  0.9.0
io.c
1 /*
2  * Copyright (c) 2008-2013 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 /* Private declarations. */
22 
23 struct nmsg_io;
24 struct nmsg_io_input;
25 struct nmsg_io_output;
26 struct nmsg_io_thr;
27 
28 struct nmsg_io_input {
29  ISC_LINK(struct nmsg_io_input) link;
30  nmsg_input_t input;
31  pthread_mutex_t lock;
32  void *user;
33  uint64_t count_nmsg_payload_in;
34 };
35 
36 struct nmsg_io_output {
37  ISC_LINK(struct nmsg_io_output) link;
38  nmsg_output_t output;
39  pthread_mutex_t lock;
40  struct timespec last;
41  void *user;
42  uint64_t count_nmsg_payload_out;
43 };
44 
45 struct nmsg_io {
46  ISC_LIST(struct nmsg_io_input) io_inputs;
47  ISC_LIST(struct nmsg_io_output) io_outputs;
48  ISC_LIST(struct nmsg_io_thr) threads;
49  int debug;
50  nmsg_io_close_fp close_fp;
51  nmsg_io_output_mode output_mode;
52  pthread_mutex_t lock;
53  uint64_t count_nmsg_payload_out;
54  unsigned count, interval;
55  volatile bool stop, stopped;
56  nmsg_io_user_fp atstart_fp;
57  nmsg_io_user_fp atexit_fp;
58  void *atstart_user;
59  void *atexit_user;
60  unsigned n_inputs;
61  unsigned n_outputs;
62 };
63 
64 struct nmsg_io_thr {
65  ISC_LINK(struct nmsg_io_thr) link;
66  pthread_t thr;
67  int threadno;
68  nmsg_io_t io;
69  nmsg_res res;
70  struct timespec now;
71  struct nmsg_io_input *io_input;
72 };
73 
74 /* Forward. */
75 
76 static void
77 init_timespec_intervals(nmsg_io_t);
78 
79 static nmsg_res
80 check_close_event(struct nmsg_io_thr *, struct nmsg_io_output *);
81 
82 static void *
83 io_thr_input(void *);
84 
85 static nmsg_res
86 io_write(struct nmsg_io_thr *, struct nmsg_io_output *, nmsg_message_t);
87 
88 static nmsg_res
89 io_write_mirrored(struct nmsg_io_thr *, nmsg_message_t);
90 
91 /* Export. */
92 
93 nmsg_io_t
94 nmsg_io_init(void) {
95  struct nmsg_io *io;
96 
97  io = calloc(1, sizeof(*io));
98  if (io == NULL)
99  return (NULL);
100  io->output_mode = nmsg_io_output_mode_stripe;
101  pthread_mutex_init(&io->lock, NULL);
102  ISC_LIST_INIT(io->threads);
103 
104  return (io);
105 }
106 
107 void
108 nmsg_io_breakloop(nmsg_io_t io) {
109  struct nmsg_io_output *io_output;
110 
111  if (io == NULL)
112  return;
113  if (io->stop)
114  return;
115 
116  io->stop = true;
117  for (io_output = ISC_LIST_HEAD(io->io_outputs);
118  io_output != NULL;
119  io_output = ISC_LIST_NEXT(io_output, link))
120  {
121  if (io_output->output != NULL)
122  _output_stop(io_output->output);
123  }
124 }
125 
126 nmsg_res
127 nmsg_io_loop(nmsg_io_t io) {
128  nmsg_res res;
129  struct nmsg_io_input *io_input;
130  struct nmsg_io_thr *iothr, *iothr_next;
131  int threadno;
132 
133  res = nmsg_res_success;
134 
135  if (io->interval > 0)
136  init_timespec_intervals(io);
137 
138  threadno = 0;
139  /* create io_input threads */
140  for (io_input = ISC_LIST_HEAD(io->io_inputs);
141  io_input != NULL;
142  io_input = ISC_LIST_NEXT(io_input, link))
143  {
144  iothr = calloc(1, sizeof(*iothr));
145  assert(iothr != NULL);
146  iothr->io = io;
147  iothr->io_input = io_input;
148  iothr->threadno = threadno;
149  ISC_LINK_INIT(iothr, link);
150  ISC_LIST_APPEND(io->threads, iothr, link);
151  assert(pthread_create(&iothr->thr, NULL, io_thr_input, iothr)
152  == 0);
153  threadno += 1;
154  }
155 
156  /* wait for io_input threads */
157  iothr = ISC_LIST_HEAD(io->threads);
158  while (iothr != NULL) {
159  iothr_next = ISC_LIST_NEXT(iothr, link);
160  assert(pthread_join(iothr->thr, NULL) == 0);
161  if (iothr->res != nmsg_res_success &&
162  iothr->res != nmsg_res_eof &&
163  iothr->res != nmsg_res_stop)
164  {
165  _nmsg_dprintfv(io->debug, 2, "nmsg_io: iothr=%p %s\n",
166  iothr, nmsg_res_lookup(iothr->res));
167  res = nmsg_res_failure;
168  }
169  free(iothr);
170  iothr = iothr_next;
171  }
172 
173  io->stopped = true;
174 
175  return (res);
176 }
177 
178 void
179 nmsg_io_destroy(nmsg_io_t *io) {
180  struct nmsg_io_input *io_input, *io_input_next;
181  struct nmsg_io_output *io_output, *io_output_next;
182 
183  /* close io_inputs */
184  io_input = ISC_LIST_HEAD((*io)->io_inputs);
185  while (io_input != NULL) {
186  io_input_next = ISC_LIST_NEXT(io_input, link);
187  if (io_input->input != NULL && (*io)->close_fp != NULL) {
188  struct nmsg_io_close_event ce;
189 
190  ce.io = *io;
192  ce.input = &io_input->input;
193  ce.input_type = io_input->input->type;
195  ce.user = io_input->user;
196 
197  (*io)->close_fp(&ce);
198  }
199  if (io_input->input != NULL) {
200  nmsg_input_close(&io_input->input);
201  }
202  free(io_input);
203  io_input = io_input_next;
204  }
205 
206  /* close io_outputs */
207  io_output = ISC_LIST_HEAD((*io)->io_outputs);
208  while (io_output != NULL) {
209  io_output_next = ISC_LIST_NEXT(io_output, link);
210  if (io_output->output != NULL && (*io)->close_fp != NULL) {
211  struct nmsg_io_close_event ce;
212 
213  ce.io = *io;
215  ce.output = &io_output->output;
216  ce.output_type = io_output->output->type;
218  ce.user = io_output->user;
219 
220  (*io)->close_fp(&ce);
221  }
222  if (io_output->output != NULL) {
223  nmsg_output_close(&io_output->output);
224  }
225  free(io_output);
226  io_output = io_output_next;
227  }
228 
229  /* print statistics */
230  if ((*io)->debug >= 2 && (*io)->count_nmsg_payload_out > 0)
231  _nmsg_dprintfv((*io)->debug, 2, "nmsg_io: io=%p"
232  " count_nmsg_payload_out=%" PRIu64 "\n",
233  (*io),
234  (*io)->count_nmsg_payload_out);
235  free(*io);
236  *io = NULL;
237 }
238 
239 nmsg_res
240 nmsg_io_add_input(nmsg_io_t io, nmsg_input_t input, void *user) {
241  struct nmsg_io_input *io_input;
242 
243  /* allocate */
244  io_input = calloc(1, sizeof(*io_input));
245  if (io_input == NULL)
246  return (nmsg_res_memfail);
247 
248  /* initialize */
249  io_input->input = input;
250  io_input->user = user;
251  pthread_mutex_init(&io_input->lock, NULL);
252 
253  /* add to nmsg_io input list */
254  pthread_mutex_lock(&io->lock);
255  ISC_LIST_APPEND(io->io_inputs, io_input, link);
256  pthread_mutex_unlock(&io->lock);
257 
258  /* increment input counter */
259  io->n_inputs += 1;
260 
261  return (nmsg_res_success);
262 }
263 
264 nmsg_res
265 nmsg_io_add_output(nmsg_io_t io, nmsg_output_t output, void *user) {
266  struct nmsg_io_output *io_output;
267 
268  /* allocate */
269  io_output = calloc(1, sizeof(*io_output));
270  if (io_output == NULL)
271  return (nmsg_res_memfail);
272 
273  /* initialize */
274  io_output->output = output;
275  io_output->user = user;
276  pthread_mutex_init(&io_output->lock, NULL);
277 
278  /* add to nmsg_io output list */
279  pthread_mutex_lock(&io->lock);
280  ISC_LIST_APPEND(io->io_outputs, io_output, link);
281  pthread_mutex_unlock(&io->lock);
282 
283  /* increment output counter */
284  io->n_outputs += 1;
285 
286  return (nmsg_res_success);
287 }
288 
289 static nmsg_res
290 _nmsg_io_add_input_socket(nmsg_io_t io, int af, char *addr, unsigned port, void *user) {
291  nmsg_input_t input;
292  struct sockaddr *sa;
293  socklen_t salen;
294  struct sockaddr_in sai;
295  struct sockaddr_in6 sai6;
296  int fd;
297  int on = 1;
298  nmsg_res res;
299 
300  if (port > 65535)
301  return (nmsg_res_failure);
302 
303  res = nmsg_sock_parse(af, addr, port, &sai, &sai6, &sa, &salen);
304  if (res != nmsg_res_success)
305  return (res);
306 
307  fd = socket(af, SOCK_DGRAM, 0);
308  if (fd < 0) {
309  _nmsg_dprintfv(io->debug, 2, "nmsg_io: socket() failed: %s\n", strerror(errno));
310  return (nmsg_res_failure);
311  }
312 
313  if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
314  _nmsg_dprintfv(io->debug, 2, "nmsg_io: setsockopt(SO_REUSEADDR) failed: %s\n",
315  strerror(errno));
316  return (nmsg_res_failure);
317  }
318 
319 #ifdef __linux__
320 # ifdef SO_RCVBUFFORCE
321  if (geteuid() == 0) {
322  int rcvbuf = 16777216;
323  if (setsockopt(fd, SOL_SOCKET, SO_RCVBUFFORCE, &rcvbuf, sizeof(rcvbuf)) < 0) {
324  _nmsg_dprintfv(io->debug, 2,
325  "nmsg_io: setsockopt(SO_RCVBUFFORCE) failed: %s\n",
326  strerror(errno));
327  }
328  }
329 # endif
330 #endif
331 
332  if (bind(fd, sa, salen) < 0) {
333  _nmsg_dprintfv(io->debug, 2,
334  "nmsg_io: bind() failed: %s\n", strerror(errno));
335  return (nmsg_res_failure);
336  }
337 
338  input = nmsg_input_open_sock(fd);
339  if (input == NULL) {
340  _nmsg_dprintfv(io->debug, 2, "nmsg_io: nmsg_input_open_sock() failed\n");
341  return (nmsg_res_failure);
342  }
343 
344  return (nmsg_io_add_input(io, input, user));
345 }
346 
347 nmsg_res
348 nmsg_io_add_input_channel(nmsg_io_t io, const char *chan, void *user) {
349  char **alias = NULL;
350  int num_aliases;
351  nmsg_res res;
352 
353  num_aliases = nmsg_chalias_lookup(chan, &alias);
354  if (num_aliases <= 0) {
355  _nmsg_dprintfv(io->debug, 2, "nmsg_io: channel alias lookup '%s' failed\n", chan);
356  res = nmsg_res_failure;
357  goto out;
358  }
359  for (int i = 0; i < num_aliases; i++) {
360  int af;
361  char *addr;
362  unsigned port_start;
363  unsigned port_end;
364 
365  res = nmsg_sock_parse_sockspec(alias[i], &af, &addr, &port_start, &port_end);
366  if (res != nmsg_res_success)
367  goto out;
368 
369  for (unsigned port = port_start; port <= port_end; port++) {
370  res = _nmsg_io_add_input_socket(io, af, addr, port, user);
371  if (res != nmsg_res_success) {
372  free(addr);
373  goto out;
374  }
375  }
376  free(addr);
377  }
378 
379  res = nmsg_res_success;
380 out:
381  nmsg_chalias_free(&alias);
382  return (res);
383 }
384 
385 #ifdef HAVE_LIBXS
386 static nmsg_res
387 _nmsg_io_add_input_xs(nmsg_io_t io, void *xs_ctx, const char *str_socket, void *user) {
388  nmsg_input_t input;
389 
390  input = nmsg_input_open_xs_endpoint(xs_ctx, str_socket);
391  if (input == NULL) {
392  _nmsg_dprintfv(io->debug, 2, "nmsg_io: nmsg_input_open_sock() failed\n");
393  return (nmsg_res_failure);
394  }
395  return (nmsg_io_add_input(io, input, user));
396 }
397 #endif /* HAVE_LIBXS */
398 
399 #ifdef HAVE_LIBXS
400 nmsg_res
401 nmsg_io_add_input_xs_channel(nmsg_io_t io, void *xs_ctx, const char *chan, void *user) {
402  char **alias = NULL;
403  int num_aliases;
404  nmsg_res res;
405 
406  num_aliases = nmsg_chalias_lookup(chan, &alias);
407  if (num_aliases <= 0) {
408  _nmsg_dprintfv(io->debug, 2,
409  "nmsg_io: XS channel alias lookup '%s' failed\n", chan);
410  res = nmsg_res_failure;
411  goto out;
412  }
413  for (int i = 0; i < num_aliases; i++) {
414  res = _nmsg_io_add_input_xs(io, xs_ctx, alias[i], user);
415  if (res != nmsg_res_success)
416  goto out;
417  }
418 
419  res = nmsg_res_success;
420 out:
421  nmsg_chalias_free(&alias);
422  return (res);
423 }
424 #else /* HAVE_LIBXS */
425 nmsg_res
426 nmsg_io_add_input_xs_channel(nmsg_io_t io,
427  void *xs_ctx __attribute__((unused)),
428  const char *chan __attribute__((unused)),
429  void *user __attribute__((unused)))
430 {
431  _nmsg_dprintfv(io->debug, 2, "nmsg_io: %s: compiled without libxs support\n", __func__);
432  return (nmsg_res_failure);
433 }
434 #endif /* HAVE_LIBXS */
435 
436 nmsg_res
437 nmsg_io_add_input_sockspec(nmsg_io_t io, const char *sockspec, void *user) {
438  int af;
439  char *addr;
440  unsigned port_start;
441  unsigned port_end;
442  nmsg_res res;
443 
444  res = nmsg_sock_parse_sockspec(sockspec, &af, &addr, &port_start, &port_end);
445  if (res != nmsg_res_success)
446  return (res);
447 
448  for (unsigned port = port_start; port <= port_end; port++) {
449  res = _nmsg_io_add_input_socket(io, af, addr, port, user);
450  if (res != nmsg_res_success) {
451  free(addr);
452  return (res);
453  }
454  }
455  free(addr);
456 
457  return (nmsg_res_success);
458 }
459 
460 nmsg_res
461 nmsg_io_add_input_fname(nmsg_io_t io, const char *fname, void *user) {
462  int fd;
463  nmsg_input_t input;
464 
465  fd = open(fname, O_RDONLY);
466  if (fd == -1) {
467  _nmsg_dprintfv(io->debug, 2, "nmsg_io: open() failed: %s\n", strerror(errno));
468  return (nmsg_res_failure);
469  }
470 
471  input = nmsg_input_open_file(fd);
472  if (input == NULL) {
473  close(fd);
474  _nmsg_dprintfv(io->debug, 2, "nmsg_io: nmsg_input_open_file() failed\n");
475  return (nmsg_res_failure);
476  }
477 
478  return (nmsg_io_add_input(io, input, user));
479 }
480 
481 unsigned
482 nmsg_io_get_num_inputs(nmsg_io_t io) {
483  return (io->n_inputs);
484 }
485 
486 unsigned
487 nmsg_io_get_num_outputs(nmsg_io_t io) {
488  return (io->n_outputs);
489 }
490 
491 void
492 nmsg_io_set_close_fp(nmsg_io_t io, nmsg_io_close_fp close_fp) {
493  io->close_fp = close_fp;
494 }
495 
496 void
497 nmsg_io_set_atstart_fp(nmsg_io_t io, nmsg_io_user_fp user_fp, void *user) {
498  io->atstart_fp = user_fp;
499  io->atstart_user = user;
500 }
501 
502 void
503 nmsg_io_set_atexit_fp(nmsg_io_t io, nmsg_io_user_fp user_fp, void *user) {
504  io->atexit_fp = user_fp;
505  io->atexit_user = user;
506 }
507 
508 void
509 nmsg_io_set_count(nmsg_io_t io, unsigned count) {
510  io->count = count;
511 }
512 
513 void
514 nmsg_io_set_debug(nmsg_io_t io, int debug) {
515  io->debug = debug;
516 }
517 
518 void
519 nmsg_io_set_interval(nmsg_io_t io, unsigned interval) {
520  io->interval = interval;
521 }
522 
523 void
524 nmsg_io_set_output_mode(nmsg_io_t io, nmsg_io_output_mode output_mode) {
525  switch (output_mode) {
528  io->output_mode = output_mode;
529  }
530 }
531 
532 /* Private functions. */
533 
534 static void
535 init_timespec_intervals(nmsg_io_t io) {
536  struct nmsg_io_output *io_output;
537  struct timespec now;
538 
539  nmsg_timespec_get(&now);
540  now.tv_nsec = 0;
541  now.tv_sec = now.tv_sec - (now.tv_sec % io->interval);
542 
543  for (io_output = ISC_LIST_HEAD(io->io_outputs);
544  io_output != NULL;
545  io_output = ISC_LIST_NEXT(io_output, link))
546  {
547  io_output->last = now;
548  }
549 }
550 
551 static nmsg_res
552 io_write(struct nmsg_io_thr *iothr, struct nmsg_io_output *io_output,
553  nmsg_message_t msg)
554 {
555  nmsg_io_t io = iothr->io;
556  nmsg_res res;
557 
558  if (io->close_fp == NULL) {
559  res = nmsg_output_write(io_output->output, msg);
560  if (io_output->output->type != nmsg_output_type_callback)
561  nmsg_message_destroy(&msg);
562  } else {
563  pthread_mutex_lock(&io_output->lock);
564  if (io_output->output == NULL) {
565  pthread_mutex_unlock(&io_output->lock);
566  return (nmsg_res_stop);
567  }
568  res = nmsg_output_write(io_output->output, msg);
569  if (io_output->output->type != nmsg_output_type_callback) {
570  pthread_mutex_unlock(&io_output->lock);
571  nmsg_message_destroy(&msg);
572  } else {
573  pthread_mutex_unlock(&io_output->lock);
574  }
575  }
576 
577  if (res != nmsg_res_success)
578  return (res);
579 
580  io_output->count_nmsg_payload_out += 1;
581 
582  pthread_mutex_lock(&io->lock);
583  io->count_nmsg_payload_out += 1;
584  pthread_mutex_unlock(&io->lock);
585 
586  return (res);
587 }
588 
589 static nmsg_res
590 check_close_event(struct nmsg_io_thr *iothr, struct nmsg_io_output *io_output) {
591  struct nmsg_io_close_event ce;
592  nmsg_io_t io = iothr->io;
594 
595  if (io->close_fp != NULL)
596  pthread_mutex_lock(&io_output->lock);
597 
598  if (io_output->output == NULL) {
599  res = nmsg_res_stop;
600  goto out;
601  }
602 
603  /* count check */
604  if (io->count > 0 &&
605  io_output->count_nmsg_payload_out > 0 &&
606  io_output->count_nmsg_payload_out % io->count == 0)
607  {
608  if (io->close_fp != NULL) {
609  /* close notification is enabled */
610  ce.io = io;
611  ce.user = io_output->user;
612  ce.output = &io_output->output;
613 
614  ce.io_type = nmsg_io_io_type_output;
615  ce.close_type = nmsg_io_close_type_count;
616  ce.output_type = io_output->output->type;
617 
618  io->close_fp(&ce);
619  if (io_output->output == NULL) {
620  io->stop = true;
621  res = nmsg_res_stop;
622  goto out;
623  }
624  } else {
625  io->stop = true;
626  res = nmsg_res_stop;
627  goto out;
628  }
629  }
630 
631  /* interval check */
632  if (io->interval > 0 &&
633  iothr->now.tv_sec - io_output->last.tv_sec >= (time_t) io->interval)
634  {
635  if (io->close_fp != NULL) {
636  /* close notification is enabled */
637  struct timespec now = iothr->now;
638  now.tv_nsec = 0;
639  now.tv_sec = now.tv_sec - (now.tv_sec % io->interval);
640  io_output->last = now;
641 
642  ce.io = io;
643  ce.user = io_output->user;
644  ce.output = &io_output->output;
645 
646  ce.io_type = nmsg_io_io_type_output;
647  ce.close_type = nmsg_io_close_type_interval;
648  ce.output_type = io_output->output->type;
649 
650  io->close_fp(&ce);
651  if (io_output->output == NULL) {
652  io->stop = true;
653  res = nmsg_res_stop;
654  goto out;
655  }
656  } else {
657  io->stop = true;
658  res = nmsg_res_stop;
659  goto out;
660  }
661  }
662 
663 out:
664  if (io->close_fp != NULL)
665  pthread_mutex_unlock(&io_output->lock);
666  return (res);
667 }
668 
669 static nmsg_res
670 io_write_mirrored(struct nmsg_io_thr *iothr, nmsg_message_t msg) {
671  nmsg_message_t msgdup;
672  nmsg_res res;
673  struct nmsg_io_output *io_output;
674 
675  res = nmsg_res_success;
676  for (io_output = ISC_LIST_HEAD(iothr->io->io_outputs);
677  io_output != NULL;
678  io_output = ISC_LIST_NEXT(io_output, link))
679  {
680  msgdup = _nmsg_message_dup(msg);
681 
682  res = io_write(iothr, io_output, msgdup);
683  if (res != nmsg_res_success)
684  break;
685  }
686  nmsg_message_destroy(&msg);
687 
688  return (res);
689 }
690 
691 static void *
692 io_thr_input(void *user) {
693  nmsg_message_t msg;
694  nmsg_res res;
695  struct nmsg_io *io;
696  struct nmsg_io_input *io_input;
697  struct nmsg_io_output *io_output;
698  struct nmsg_io_thr *iothr;
699 
700  msg = NULL;
701  iothr = (struct nmsg_io_thr *) user;
702  io = iothr->io;
703  io_input = iothr->io_input;
704  io_output = ISC_LIST_HEAD(io->io_outputs);
705 
706  _nmsg_dprintfv(io->debug, 4, "nmsg_io: started input thread @ %p\n", iothr);
707 
708  /* sanity checks */
709  if (io_output == NULL) {
710  _nmsg_dprintfv(io->debug, 1, "nmsg_io: no outputs\n");
711  iothr->res = nmsg_res_failure;
712  return (NULL);
713  }
714 
715  /* call user function */
716  if (io->atstart_fp != NULL)
717  io->atstart_fp(iothr->threadno, io->atstart_user);
718 
719  /* loop over input */
720  for (;;) {
721  nmsg_timespec_get(&iothr->now);
722  res = nmsg_input_read(io_input->input, &msg);
723 
724  if (io->stop == true) {
725  if (res == nmsg_res_success && msg != NULL)
726  nmsg_message_destroy(&msg);
727  break;
728  }
729  if (res == nmsg_res_again) {
730  res = check_close_event(iothr, io_output);
731  if (io->stop == true)
732  break;
733  continue;
734  }
735  if (res != nmsg_res_success) {
736  iothr->res = res;
737  break;
738  }
739 
740  assert(msg != NULL);
741 
742  io_input->count_nmsg_payload_in += 1;
743 
744  if (io->output_mode == nmsg_io_output_mode_stripe)
745  res = io_write(iothr, io_output, msg);
746  else if (io->output_mode == nmsg_io_output_mode_mirror)
747  res = io_write_mirrored(iothr, msg);
748 
749  if (res != nmsg_res_success) {
750  iothr->res = res;
751  break;
752  }
753 
754  res = check_close_event(iothr, io_output);
755  if (io->stop == true)
756  break;
757 
758  io_output = ISC_LIST_NEXT(io_output, link);
759  if (io_output == NULL)
760  io_output = ISC_LIST_HEAD(io->io_outputs);
761 
762  if (io->stop == true)
763  break;
764  }
765 
766  /* call user function */
767  if (io->atexit_fp != NULL)
768  io->atexit_fp(iothr->threadno, io->atexit_user);
769 
770  _nmsg_dprintfv(io->debug, 2,
771  "nmsg_io: iothr=%p count_nmsg_payload_in=%" PRIu64 "\n",
772  iothr, io_input->count_nmsg_payload_in);
773  return (NULL);
774 }
void nmsg_io_breakloop(nmsg_io_t io)
Force a currently executing nmsg_io_loop() to stop looping and return.
Definition: io.c:108
nmsg_input_t nmsg_input_open_xs_endpoint(void *xs_ctx, const char *ep)
Create an XS socket and initialize a new NMSG stream input from it.
void nmsg_timespec_get(struct timespec *ts)
Get the current time.
Definition: timespec.c:24
nmsg_res nmsg_sock_parse(int af, const char *addr, unsigned port, struct sockaddr_in *sai, struct sockaddr_in6 *sai6, struct sockaddr **sa, socklen_t *salen)
Parse an IP address and port number into a sockaddr.
Definition: sock.c:26
nmsg_io_t io
this nmsg_io loop
Definition: io.h:114
nmsg_res
nmsg result code
Definition: res.h:25
unsigned nmsg_io_get_num_outputs(nmsg_io_t io)
Get the number of outputs bound to the nmsg_io_t object.
Definition: io.c:487
void(* nmsg_io_close_fp)(struct nmsg_io_close_event *ce)
Function for handling close event notifications.
Definition: io.h:125
void nmsg_io_set_atexit_fp(nmsg_io_t io, nmsg_io_user_fp user_fp, void *user)
Set a user-specified function to be called in each thread before the thread exits.
Definition: io.c:503
success
Definition: res.h:26
nmsg_res nmsg_io_add_input(nmsg_io_t io, nmsg_input_t input, void *user)
Add an nmsg input to an nmsg_io_t object.
Definition: io.c:240
payload count reached
Definition: io.h:50
close event output
Definition: io.h:59
void nmsg_io_set_atstart_fp(nmsg_io_t io, nmsg_io_user_fp user_fp, void *user)
Set a user-specified function to be called in each thread after the thread starts.
Definition: io.c:497
out of memory
Definition: res.h:29
nmsg_input_type input_type
type of 'input' field
Definition: io.h:111
caller should try again
Definition: res.h:35
void nmsg_chalias_free(char ***alias)
Free the memory allocated by nmsg_chalias_lookup().
Definition: chalias.c:71
nmsg_input_t * input
pointer to input stream
Definition: io.h:107
nmsg_res nmsg_input_read(nmsg_input_t input, nmsg_message_t *msg)
Read one NMSG message from an input stream.
Definition: input.c:211
nmsg_res nmsg_output_write(nmsg_output_t output, nmsg_message_t msg)
Write an nmsg message to an nmsg_output_t object.
Definition: output.c:115
nmsg_io_output_mode
Output behavior when multiple outputs are present.
Definition: io.h:65
mirror payloads across output
Definition: io.h:67
nmsg_res nmsg_io_add_input_fname(nmsg_io_t io, const char *fname, void *user)
Add an NMSG file to an nmsg_io_t object.
Definition: io.c:461
close event input
Definition: io.h:58
nmsg_res nmsg_io_add_input_xs_channel(nmsg_io_t io, void *xs_ctx, const char *chan, void *user)
Add an nmsg XS input channel to an nmsg_io_t object.
void nmsg_io_set_interval(nmsg_io_t io, unsigned interval)
Configure the nmsg_io_t object to close inputs after processing for a set amount of time...
Definition: io.c:519
void nmsg_io_set_close_fp(nmsg_io_t io, nmsg_io_close_fp close_fp)
Set the close event notification function associated with an nmsg_io_t object.
Definition: io.c:492
nmsg_res nmsg_sock_parse_sockspec(const char *sockspec, int *af, char **addr, unsigned *port_start, unsigned *port_end)
Parse a "socket spec" string.
Definition: sock.c:68
void nmsg_message_destroy(nmsg_message_t *msg)
Destroy a message object and deallocate any resources associated with it.
void nmsg_io_set_debug(nmsg_io_t io, int debug)
Set the debug level for an nmsg_io_t object.
Definition: io.c:514
nmsg_io_io_type io_type
whether 'input' or 'output'
Definition: io.h:115
processing should stop
Definition: res.h:34
Structure for passing information about a close event between the nmsg_io processing loop and the ori...
Definition: io.h:105
int nmsg_chalias_lookup(const char *ch, char ***alias)
Lookup a channel alias.
Definition: chalias.c:31
nmsg_res nmsg_io_add_input_channel(nmsg_io_t io, const char *chan, void *user)
Add an nmsg input channel to an nmsg_io_t object.
Definition: io.c:348
const char * nmsg_res_lookup(enum nmsg_res val)
Look up a result code by value.
Definition: res.c:41
void nmsg_io_set_count(nmsg_io_t io, unsigned count)
Configure the nmsg_io_t object to close inputs after processing a certain non-zero number of payloads...
Definition: io.c:509
void * user
caller-provided user pointer
Definition: io.h:117
nmsg_res nmsg_io_add_input_sockspec(nmsg_io_t io, const char *sockspec, void *user)
Add an nmsg input sockspec to an nmsg_io_t object.
Definition: io.c:437
nmsg_res nmsg_io_add_output(nmsg_io_t io, nmsg_output_t output, void *user)
Add an nmsg output to an nmsg_io_t object.
Definition: io.c:265
nmsg_io_t nmsg_io_init(void)
Initialize a new nmsg_io_t object.
Definition: io.c:94
generic failure
Definition: res.h:27
nmsg_input_t nmsg_input_open_sock(int fd)
Initialize a new NMSG stream input from a datagram socket source.
Definition: input.c:36
end of file
Definition: io.h:49
nmsg_io_close_type close_type
why the stream was closed
Definition: io.h:116
void(* nmsg_io_user_fp)(unsigned threadno, void *user)
Optional user-specified function to be run at thread start or thread stop.
Definition: io.h:130
stripe payloads across output
Definition: io.h:66
void nmsg_io_set_output_mode(nmsg_io_t io, nmsg_io_output_mode output_mode)
Set the output mode behavior for an nmsg_io_t object.
Definition: io.c:524
nmsg_output_t * output
pointer to output stream
Definition: io.h:108
nmsg_output_type output_type
type of 'output' field
Definition: io.h:112
void nmsg_io_destroy(nmsg_io_t *io)
Deallocate the resources associated with an nmsg_io_t object.
Definition: io.c:179
nmsg_res nmsg_io_loop(nmsg_io_t io)
Begin processing the data specified by the configured inputs and outputs.
Definition: io.c:127
nmsg_res nmsg_input_close(nmsg_input_t *input)
Close an nmsg_input_t object and release all associated resources.
Definition: input.c:172
nmsg_input_t nmsg_input_open_file(int fd)
Initialize a new NMSG stream input from a byte-stream file source.
Definition: input.c:31
nmsg_res nmsg_output_close(nmsg_output_t *output)
Close an nmsg_output_t object.
Definition: output.c:134
unsigned nmsg_io_get_num_inputs(nmsg_io_t io)
Get the number of inputs bound to the nmsg_io_t object.
Definition: io.c:482
interval elapsed
Definition: io.h:51
end of file
Definition: res.h:28