25 struct nmsg_io_output;
28 struct nmsg_io_input {
29 ISC_LINK(
struct nmsg_io_input) link;
33 uint64_t count_nmsg_payload_in;
36 struct nmsg_io_output {
37 ISC_LINK(
struct nmsg_io_output) link;
42 uint64_t count_nmsg_payload_out;
46 ISC_LIST(
struct nmsg_io_input) io_inputs;
47 ISC_LIST(struct nmsg_io_output) io_outputs;
48 ISC_LIST(struct nmsg_io_thr) threads;
53 uint64_t count_nmsg_payload_out;
54 unsigned count, interval;
55 volatile
bool stop, stopped;
65 ISC_LINK(
struct nmsg_io_thr) link;
71 struct nmsg_io_input *io_input;
77 init_timespec_intervals(nmsg_io_t);
80 check_close_event(struct nmsg_io_thr *, struct nmsg_io_output *);
86 io_write(struct nmsg_io_thr *, struct nmsg_io_output *, nmsg_message_t);
89 io_write_mirrored(struct nmsg_io_thr *, nmsg_message_t);
97 io = calloc(1,
sizeof(*io));
101 pthread_mutex_init(&io->lock, NULL);
102 ISC_LIST_INIT(io->threads);
109 struct nmsg_io_output *io_output;
117 for (io_output = ISC_LIST_HEAD(io->io_outputs);
119 io_output = ISC_LIST_NEXT(io_output, link))
121 if (io_output->output != NULL)
122 _output_stop(io_output->output);
129 struct nmsg_io_input *io_input;
130 struct nmsg_io_thr *iothr, *iothr_next;
135 if (io->interval > 0)
136 init_timespec_intervals(io);
140 for (io_input = ISC_LIST_HEAD(io->io_inputs);
142 io_input = ISC_LIST_NEXT(io_input, link))
144 iothr = calloc(1,
sizeof(*iothr));
145 assert(iothr != NULL);
147 iothr->io_input = io_input;
148 iothr->threadno = threadno;
149 ISC_LINK_INIT(iothr, link);
150 ISC_LIST_APPEND(io->threads, iothr, link);
151 assert(pthread_create(&iothr->thr, NULL, io_thr_input, iothr)
157 iothr = ISC_LIST_HEAD(io->threads);
158 while (iothr != NULL) {
159 iothr_next = ISC_LIST_NEXT(iothr, link);
160 assert(pthread_join(iothr->thr, NULL) == 0);
165 _nmsg_dprintfv(io->debug, 2,
"nmsg_io: iothr=%p %s\n",
180 struct nmsg_io_input *io_input, *io_input_next;
181 struct nmsg_io_output *io_output, *io_output_next;
184 io_input = ISC_LIST_HEAD((*io)->io_inputs);
185 while (io_input != NULL) {
186 io_input_next = ISC_LIST_NEXT(io_input, link);
187 if (io_input->input != NULL && (*io)->close_fp != NULL) {
192 ce.
input = &io_input->input;
195 ce.
user = io_input->user;
197 (*io)->close_fp(&ce);
199 if (io_input->input != NULL) {
203 io_input = io_input_next;
207 io_output = ISC_LIST_HEAD((*io)->io_outputs);
208 while (io_output != NULL) {
209 io_output_next = ISC_LIST_NEXT(io_output, link);
210 if (io_output->output != NULL && (*io)->close_fp != NULL) {
215 ce.
output = &io_output->output;
218 ce.
user = io_output->user;
220 (*io)->close_fp(&ce);
222 if (io_output->output != NULL) {
226 io_output = io_output_next;
230 if ((*io)->debug >= 2 && (*io)->count_nmsg_payload_out > 0)
231 _nmsg_dprintfv((*io)->debug, 2,
"nmsg_io: io=%p"
232 " count_nmsg_payload_out=%" PRIu64
"\n",
234 (*io)->count_nmsg_payload_out);
241 struct nmsg_io_input *io_input;
244 io_input = calloc(1,
sizeof(*io_input));
245 if (io_input == NULL)
249 io_input->input = input;
250 io_input->user = user;
251 pthread_mutex_init(&io_input->lock, NULL);
254 pthread_mutex_lock(&io->lock);
255 ISC_LIST_APPEND(io->io_inputs, io_input, link);
256 pthread_mutex_unlock(&io->lock);
266 struct nmsg_io_output *io_output;
269 io_output = calloc(1,
sizeof(*io_output));
270 if (io_output == NULL)
274 io_output->output = output;
275 io_output->user = user;
276 pthread_mutex_init(&io_output->lock, NULL);
279 pthread_mutex_lock(&io->lock);
280 ISC_LIST_APPEND(io->io_outputs, io_output, link);
281 pthread_mutex_unlock(&io->lock);
290 _nmsg_io_add_input_socket(nmsg_io_t io,
int af,
char *addr,
unsigned port,
void *user) {
294 struct sockaddr_in sai;
295 struct sockaddr_in6 sai6;
307 fd = socket(af, SOCK_DGRAM, 0);
309 _nmsg_dprintfv(io->debug, 2,
"nmsg_io: socket() failed: %s\n", strerror(errno));
313 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on,
sizeof(on)) < 0) {
314 _nmsg_dprintfv(io->debug, 2,
"nmsg_io: setsockopt(SO_REUSEADDR) failed: %s\n",
320 # ifdef SO_RCVBUFFORCE
321 if (geteuid() == 0) {
322 int rcvbuf = 16777216;
323 if (setsockopt(fd, SOL_SOCKET, SO_RCVBUFFORCE, &rcvbuf,
sizeof(rcvbuf)) < 0) {
324 _nmsg_dprintfv(io->debug, 2,
325 "nmsg_io: setsockopt(SO_RCVBUFFORCE) failed: %s\n",
332 if (bind(fd, sa, salen) < 0) {
333 _nmsg_dprintfv(io->debug, 2,
334 "nmsg_io: bind() failed: %s\n", strerror(errno));
340 _nmsg_dprintfv(io->debug, 2,
"nmsg_io: nmsg_input_open_sock() failed\n");
354 if (num_aliases <= 0) {
355 _nmsg_dprintfv(io->debug, 2,
"nmsg_io: channel alias lookup '%s' failed\n", chan);
359 for (
int i = 0; i < num_aliases; i++) {
369 for (
unsigned port = port_start; port <= port_end; port++) {
370 res = _nmsg_io_add_input_socket(io, af, addr, port, user);
387 _nmsg_io_add_input_xs(nmsg_io_t io,
void *xs_ctx,
const char *str_socket,
void *user) {
392 _nmsg_dprintfv(io->debug, 2,
"nmsg_io: nmsg_input_open_sock() failed\n");
407 if (num_aliases <= 0) {
408 _nmsg_dprintfv(io->debug, 2,
409 "nmsg_io: XS channel alias lookup '%s' failed\n", chan);
413 for (
int i = 0; i < num_aliases; i++) {
414 res = _nmsg_io_add_input_xs(io, xs_ctx, alias[i], user);
427 void *xs_ctx __attribute__((unused)),
428 const char *chan __attribute__((unused)),
429 void *user __attribute__((unused)))
431 _nmsg_dprintfv(io->debug, 2,
"nmsg_io: %s: compiled without libxs support\n", __func__);
448 for (
unsigned port = port_start; port <= port_end; port++) {
449 res = _nmsg_io_add_input_socket(io, af, addr, port, user);
465 fd = open(fname, O_RDONLY);
467 _nmsg_dprintfv(io->debug, 2,
"nmsg_io: open() failed: %s\n", strerror(errno));
474 _nmsg_dprintfv(io->debug, 2,
"nmsg_io: nmsg_input_open_file() failed\n");
483 return (io->n_inputs);
488 return (io->n_outputs);
493 io->close_fp = close_fp;
498 io->atstart_fp = user_fp;
499 io->atstart_user = user;
504 io->atexit_fp = user_fp;
505 io->atexit_user = user;
520 io->interval = interval;
525 switch (output_mode) {
528 io->output_mode = output_mode;
535 init_timespec_intervals(nmsg_io_t io) {
536 struct nmsg_io_output *io_output;
541 now.tv_sec = now.tv_sec - (now.tv_sec % io->interval);
543 for (io_output = ISC_LIST_HEAD(io->io_outputs);
545 io_output = ISC_LIST_NEXT(io_output, link))
547 io_output->last = now;
552 io_write(
struct nmsg_io_thr *iothr,
struct nmsg_io_output *io_output,
555 nmsg_io_t io = iothr->io;
558 if (io->close_fp == NULL) {
560 if (io_output->output->type != nmsg_output_type_callback)
563 pthread_mutex_lock(&io_output->lock);
564 if (io_output->output == NULL) {
565 pthread_mutex_unlock(&io_output->lock);
569 if (io_output->output->type != nmsg_output_type_callback) {
570 pthread_mutex_unlock(&io_output->lock);
573 pthread_mutex_unlock(&io_output->lock);
580 io_output->count_nmsg_payload_out += 1;
582 pthread_mutex_lock(&io->lock);
583 io->count_nmsg_payload_out += 1;
584 pthread_mutex_unlock(&io->lock);
590 check_close_event(
struct nmsg_io_thr *iothr,
struct nmsg_io_output *io_output) {
592 nmsg_io_t io = iothr->
io;
595 if (io->close_fp != NULL)
596 pthread_mutex_lock(&io_output->lock);
598 if (io_output->output == NULL) {
605 io_output->count_nmsg_payload_out > 0 &&
606 io_output->count_nmsg_payload_out % io->count == 0)
608 if (io->close_fp != NULL) {
611 ce.user = io_output->user;
612 ce.output = &io_output->output;
616 ce.output_type = io_output->output->type;
619 if (io_output->output == NULL) {
632 if (io->interval > 0 &&
633 iothr->now.tv_sec - io_output->last.tv_sec >= (time_t) io->interval)
635 if (io->close_fp != NULL) {
637 struct timespec now = iothr->now;
639 now.tv_sec = now.tv_sec - (now.tv_sec % io->interval);
640 io_output->last = now;
643 ce.user = io_output->user;
644 ce.output = &io_output->output;
648 ce.output_type = io_output->output->type;
651 if (io_output->output == NULL) {
664 if (io->close_fp != NULL)
665 pthread_mutex_unlock(&io_output->lock);
670 io_write_mirrored(
struct nmsg_io_thr *iothr, nmsg_message_t msg) {
671 nmsg_message_t msgdup;
673 struct nmsg_io_output *io_output;
676 for (io_output = ISC_LIST_HEAD(iothr->io->io_outputs);
678 io_output = ISC_LIST_NEXT(io_output, link))
680 msgdup = _nmsg_message_dup(msg);
682 res = io_write(iothr, io_output, msgdup);
692 io_thr_input(
void *user) {
696 struct nmsg_io_input *io_input;
697 struct nmsg_io_output *io_output;
698 struct nmsg_io_thr *iothr;
701 iothr = (
struct nmsg_io_thr *) user;
703 io_input = iothr->io_input;
704 io_output = ISC_LIST_HEAD(io->io_outputs);
706 _nmsg_dprintfv(io->debug, 4,
"nmsg_io: started input thread @ %p\n", iothr);
709 if (io_output == NULL) {
710 _nmsg_dprintfv(io->debug, 1,
"nmsg_io: no outputs\n");
716 if (io->atstart_fp != NULL)
717 io->atstart_fp(iothr->threadno, io->atstart_user);
724 if (io->stop ==
true) {
730 res = check_close_event(iothr, io_output);
731 if (io->stop ==
true)
742 io_input->count_nmsg_payload_in += 1;
745 res = io_write(iothr, io_output, msg);
747 res = io_write_mirrored(iothr, msg);
754 res = check_close_event(iothr, io_output);
755 if (io->stop ==
true)
758 io_output = ISC_LIST_NEXT(io_output, link);
759 if (io_output == NULL)
760 io_output = ISC_LIST_HEAD(io->io_outputs);
762 if (io->stop ==
true)
767 if (io->atexit_fp != NULL)
768 io->atexit_fp(iothr->threadno, io->atexit_user);
770 _nmsg_dprintfv(io->debug, 2,
771 "nmsg_io: iothr=%p count_nmsg_payload_in=%" PRIu64
"\n",
772 iothr, io_input->count_nmsg_payload_in);
void nmsg_io_breakloop(nmsg_io_t io)
Force a currently executing nmsg_io_loop() to stop looping and return.
void nmsg_timespec_get(struct timespec *ts)
Get the current time.
nmsg_res nmsg_sock_parse(int af, const char *addr, unsigned port, struct sockaddr_in *sai, struct sockaddr_in6 *sai6, struct sockaddr **sa, socklen_t *salen)
Parse an IP address and port number into a sockaddr.
nmsg_io_t io
this nmsg_io loop
unsigned nmsg_io_get_num_outputs(nmsg_io_t io)
Get the number of outputs bound to the nmsg_io_t object.
void(* nmsg_io_close_fp)(struct nmsg_io_close_event *ce)
Function for handling close event notifications.
void nmsg_io_set_atexit_fp(nmsg_io_t io, nmsg_io_user_fp user_fp, void *user)
Set a user-specified function to be called in each thread before the thread exits.
nmsg_res nmsg_io_add_input(nmsg_io_t io, nmsg_input_t input, void *user)
Add an nmsg input to an nmsg_io_t object.
void nmsg_io_set_atstart_fp(nmsg_io_t io, nmsg_io_user_fp user_fp, void *user)
Set a user-specified function to be called in each thread after the thread starts.
nmsg_input_type input_type
type of 'input' field
void nmsg_chalias_free(char ***alias)
Free the memory allocated by nmsg_chalias_lookup().
nmsg_input_t * input
pointer to input stream
nmsg_res nmsg_output_write(nmsg_output_t output, nmsg_message_t msg)
Write an nmsg message to an nmsg_output_t object.
nmsg_io_output_mode
Output behavior when multiple outputs are present.
mirror payloads across output
nmsg_res nmsg_io_add_input_fname(nmsg_io_t io, const char *fname, void *user)
Add an NMSG file to an nmsg_io_t object.
nmsg_res nmsg_io_add_input_xs_channel(nmsg_io_t io, void *xs_ctx, const char *chan, void *user)
Add an nmsg XS input channel to an nmsg_io_t object.
void nmsg_io_set_interval(nmsg_io_t io, unsigned interval)
Configure the nmsg_io_t object to close inputs after processing for a set amount of time...
void nmsg_io_set_close_fp(nmsg_io_t io, nmsg_io_close_fp close_fp)
Set the close event notification function associated with an nmsg_io_t object.
nmsg_res nmsg_sock_parse_sockspec(const char *sockspec, int *af, char **addr, unsigned *port_start, unsigned *port_end)
Parse a "socket spec" string.
void nmsg_message_destroy(nmsg_message_t *msg)
Destroy a message object and deallocate any resources associated with it.
void nmsg_io_set_debug(nmsg_io_t io, int debug)
Set the debug level for an nmsg_io_t object.
nmsg_io_io_type io_type
whether 'input' or 'output'
Structure for passing information about a close event between the nmsg_io processing loop and the ori...
int nmsg_chalias_lookup(const char *ch, char ***alias)
Lookup a channel alias.
nmsg_res nmsg_io_add_input_channel(nmsg_io_t io, const char *chan, void *user)
Add an nmsg input channel to an nmsg_io_t object.
const char * nmsg_res_lookup(enum nmsg_res val)
Look up a result code by value.
void nmsg_io_set_count(nmsg_io_t io, unsigned count)
Configure the nmsg_io_t object to close inputs after processing a certain non-zero number of payloads...
void * user
caller-provided user pointer
nmsg_res nmsg_io_add_input_sockspec(nmsg_io_t io, const char *sockspec, void *user)
Add an nmsg input sockspec to an nmsg_io_t object.
nmsg_res nmsg_io_add_output(nmsg_io_t io, nmsg_output_t output, void *user)
Add an nmsg output to an nmsg_io_t object.
nmsg_io_t nmsg_io_init(void)
Initialize a new nmsg_io_t object.
nmsg_io_close_type close_type
why the stream was closed
void(* nmsg_io_user_fp)(unsigned threadno, void *user)
Optional user-specified function to be run at thread start or thread stop.
stripe payloads across output
void nmsg_io_set_output_mode(nmsg_io_t io, nmsg_io_output_mode output_mode)
Set the output mode behavior for an nmsg_io_t object.
nmsg_output_t * output
pointer to output stream
nmsg_output_type output_type
type of 'output' field
void nmsg_io_destroy(nmsg_io_t *io)
Deallocate the resources associated with an nmsg_io_t object.
nmsg_res nmsg_io_loop(nmsg_io_t io)
Begin processing the data specified by the configured inputs and outputs.
nmsg_res nmsg_output_close(nmsg_output_t *output)
Close an nmsg_output_t object.
unsigned nmsg_io_get_num_inputs(nmsg_io_t io)
Get the number of inputs bound to the nmsg_io_t object.