nmsg
0.9.0
|
Multi-threaded nmsg I/O processing. More...
Go to the source code of this file.
Data Structures | |
struct | nmsg_io_close_event |
Structure for passing information about a close event between the nmsg_io processing loop and the original caller. More... | |
Typedefs | |
typedef void(* | nmsg_io_close_fp )(struct nmsg_io_close_event *ce) |
Function for handling close event notifications. More... | |
typedef void(* | nmsg_io_user_fp )(unsigned threadno, void *user) |
Optional user-specified function to be run at thread start or thread stop. | |
Enumerations | |
enum | nmsg_io_close_type { nmsg_io_close_type_eof, nmsg_io_close_type_count, nmsg_io_close_type_interval } |
Type of a close event notification. More... | |
enum | nmsg_io_io_type { nmsg_io_io_type_input, nmsg_io_io_type_output } |
Type of the stream associated with a close event. More... | |
enum | nmsg_io_output_mode { nmsg_io_output_mode_stripe, nmsg_io_output_mode_mirror } |
Output behavior when multiple outputs are present. More... | |
Functions | |
nmsg_io_t | nmsg_io_init (void) |
Initialize a new nmsg_io_t object. More... | |
nmsg_res | nmsg_io_add_input (nmsg_io_t io, nmsg_input_t input, void *user) |
Add an nmsg input to an nmsg_io_t object. More... | |
nmsg_res | nmsg_io_add_input_channel (nmsg_io_t io, const char *chan, void *user) |
Add an nmsg input channel to an nmsg_io_t object. More... | |
nmsg_res | nmsg_io_add_input_xs_channel (nmsg_io_t io, void *xs_ctx, const char *chan, void *user) |
Add an nmsg XS input channel to an nmsg_io_t object. More... | |
nmsg_res | nmsg_io_add_input_sockspec (nmsg_io_t io, const char *sockspec, void *user) |
Add an nmsg input sockspec to an nmsg_io_t object. More... | |
nmsg_res | nmsg_io_add_input_fname (nmsg_io_t io, const char *fname, void *user) |
Add an NMSG file to an nmsg_io_t object. More... | |
nmsg_res | nmsg_io_add_output (nmsg_io_t io, nmsg_output_t output, void *user) |
Add an nmsg output to an nmsg_io_t object. More... | |
nmsg_res | nmsg_io_loop (nmsg_io_t io) |
Begin processing the data specified by the configured inputs and outputs. More... | |
void | nmsg_io_breakloop (nmsg_io_t io) |
Force a currently executing nmsg_io_loop() to stop looping and return. More... | |
void | nmsg_io_destroy (nmsg_io_t *io) |
Deallocate the resources associated with an nmsg_io_t object. More... | |
unsigned | nmsg_io_get_num_inputs (nmsg_io_t io) |
Get the number of inputs bound to the nmsg_io_t object. More... | |
unsigned | nmsg_io_get_num_outputs (nmsg_io_t io) |
Get the number of outputs bound to the nmsg_io_t object. More... | |
void | nmsg_io_set_close_fp (nmsg_io_t io, nmsg_io_close_fp close_fp) |
Set the close event notification function associated with an nmsg_io_t object. More... | |
void | nmsg_io_set_atstart_fp (nmsg_io_t io, nmsg_io_user_fp user_fp, void *user) |
Set a user-specified function to be called in each thread after the thread starts. More... | |
void | nmsg_io_set_atexit_fp (nmsg_io_t io, nmsg_io_user_fp user_fp, void *user) |
Set a user-specified function to be called in each thread before the thread exits. More... | |
void | nmsg_io_set_count (nmsg_io_t io, unsigned count) |
Configure the nmsg_io_t object to close inputs after processing a certain non-zero number of payloads. More... | |
void | nmsg_io_set_debug (nmsg_io_t io, int debug) |
Set the debug level for an nmsg_io_t object. More... | |
void | nmsg_io_set_interval (nmsg_io_t io, unsigned interval) |
Configure the nmsg_io_t object to close inputs after processing for a set amount of time. More... | |
void | nmsg_io_set_output_mode (nmsg_io_t io, nmsg_io_output_mode output_mode) |
Set the output mode behavior for an nmsg_io_t object. More... | |
Multi-threaded nmsg I/O processing.
nmsg_io_t objects handle the multiplexing of NMSG data between nmsg_input_t and nmsg_output_t objects. Callers should initialize at least one input and at least one output and add them to an nmsg_io_t object before calling nmsg_io_loop().
Striping and mirroring of input payloads to individual outputs is supported. Striping is the default mode. Mirroring imposes the overhead of a per-output copy for each input payload.
MP:
Definition in file io.h.
typedef void(* nmsg_io_close_fp)(struct nmsg_io_close_event *ce) |
enum nmsg_io_close_type |
enum nmsg_io_io_type |
enum nmsg_io_output_mode |
nmsg_io_t nmsg_io_init | ( | void | ) |
nmsg_res nmsg_io_add_input | ( | nmsg_io_t | io, |
nmsg_input_t | input, | ||
void * | user | ||
) |
Add an nmsg input to an nmsg_io_t object.
When nmsg_io_loop() is called, one thread will be created for each input to process input payloads.
[in] | io | Valid nmsg_io_t object. |
[in] | input | Valid nmsg_input_t object. |
[in] | user | NULL or an input-specific user pointer. |
nmsg_res nmsg_io_add_input_channel | ( | nmsg_io_t | io, |
const char * | chan, | ||
void * | user | ||
) |
Add an nmsg input channel to an nmsg_io_t object.
When nmsg_io_loop() is called, one thread will be created for each input socket constituting the channel to process input payloads.
"Channels" are specified in the channel alias file, which is usually a file named "nmsg.chalias" in the sysconfdir.
[in] | io | Valid nmsg_io_t object. |
[in] | chan | Input channel name. |
[in] | user | NULL or an input-specific user pointer. |
nmsg_res nmsg_io_add_input_xs_channel | ( | nmsg_io_t | io, |
void * | xs_ctx, | ||
const char * | chan, | ||
void * | user | ||
) |
Add an nmsg XS input channel to an nmsg_io_t object.
[in] | io | Valid nmsg_io_t object. |
[in] | xs_ctx | XS context object. |
[in] | chan | Input channel name. |
[in] | user | NULL or an input-specific user pointer. |
nmsg_res nmsg_io_add_input_sockspec | ( | nmsg_io_t | io, |
const char * | sockspec, | ||
void * | user | ||
) |
Add an nmsg input sockspec to an nmsg_io_t object.
When nmsg_io_loop() is called, one thread will be created for each input socket constituting the sockspec to process input payloads.
Sockspecs are strings in the form "<ADDRESS>/<PORTRANGE>" where <ADDRESS> is an IPv4 or IPv6 address, and <PORTRANGE> is either a single port or a contiguous, inclusive range of ports of the form "<PORT_START>..<PORT_END>".
[in] | io | Valid nmsg_io_t object. |
[in] | sock | Input channel. |
[in] | user | NULL or an input-specific user pointer. |
nmsg_res nmsg_io_add_input_fname | ( | nmsg_io_t | io, |
const char * | fname, | ||
void * | user | ||
) |
Add an NMSG file to an nmsg_io_t object.
[in] | io | Valid nmsg_io_t object. |
[in] | fname | Name of NMSG file. |
[in] | user | NULL or an input-specific user pointer. |
nmsg_res nmsg_io_add_output | ( | nmsg_io_t | io, |
nmsg_output_t | output, | ||
void * | user | ||
) |
Add an nmsg output to an nmsg_io_t object.
When nmsg_io_loop() is called, the input threads will cycle over and write payloads to the available outputs.
[in] | io | Valid nmsg_io_t object. |
[in] | output | Valid nmsg_output_t object. |
[in] | user | NULL or an output-specific user pointer. |
nmsg_res nmsg_io_loop | ( | nmsg_io_t | io | ) |
Begin processing the data specified by the configured inputs and outputs.
One processing thread is created for each input. nmsg_io_loop() does not return until these threads finish and are destroyed.
Only nmsg_io_breakloop() may be called asynchronously while nmsg_io_loop() is executing.
nmsg_io_loop() invalidates an nmsg_io_t object. nmsg_io_destroy() should then be called.
[in] | io | valid nmsg_io_t object. |
void nmsg_io_breakloop | ( | nmsg_io_t | io | ) |
Force a currently executing nmsg_io_loop() to stop looping and return.
Since nmsg_io_loop() is a blocking call, nmsg_io_breakloop() must be called asynchronously.
This function is safe to call inside a signal handler.
[in] | io | Valid and currently processing nmsg_io_t object. |
void nmsg_io_destroy | ( | nmsg_io_t * | io | ) |
unsigned nmsg_io_get_num_inputs | ( | nmsg_io_t | io | ) |
unsigned nmsg_io_get_num_outputs | ( | nmsg_io_t | io | ) |
void nmsg_io_set_close_fp | ( | nmsg_io_t | io, |
nmsg_io_close_fp | close_fp | ||
) |
Set the close event notification function associated with an nmsg_io_t object.
The provided function will only be called at EOF on an input or output stream, unless nmsg_io_set_count() or nmsg_io_set_interval() are used to specify conditions when an input stream should be closed.
[in] | io | Valid nmsg_io_t object. |
[in] | close_fp | Close event notification function. It must be reentrant. |
void nmsg_io_set_atstart_fp | ( | nmsg_io_t | io, |
nmsg_io_user_fp | user_fp, | ||
void * | user | ||
) |
void nmsg_io_set_atexit_fp | ( | nmsg_io_t | io, |
nmsg_io_user_fp | user_fp, | ||
void * | user | ||
) |
void nmsg_io_set_count | ( | nmsg_io_t | io, |
unsigned | count | ||
) |
Configure the nmsg_io_t object to close inputs after processing a certain non-zero number of payloads.
If the 'user' pointer associated with an output stream is non-NULL the close event notification function must be set, and this function must reopen the stream. If the 'user' pointer is NULL, nmsg_io processing will be shut down.
[in] | io | Valid nmsg_io_t object. |
[in] | count | Integer > 0. |
void nmsg_io_set_debug | ( | nmsg_io_t | io, |
int | debug | ||
) |
void nmsg_io_set_interval | ( | nmsg_io_t | io, |
unsigned | interval | ||
) |
Configure the nmsg_io_t object to close inputs after processing for a set amount of time.
If the 'user' pointer associated with an output stream is non-NULL the close event notification function must be set, and this function must reopen the stream. If the 'user' pointer is NULL, nmsg_io processing will be shut down.
[in] | io | Valid nmsg_io_t object. |
[in] | interval | Positive number of seconds. |
void nmsg_io_set_output_mode | ( | nmsg_io_t | io, |
nmsg_io_output_mode | output_mode | ||
) |
Set the output mode behavior for an nmsg_io_t object.
Nmsg payloads received from inputs may be striped across available outputs (the default), or mirrored across all available outputs.
Since nmsg_io must synchronize access to individual outputs, the mirrored output mode will limit the amount of parallelism that can be achieved.
[in] | io | Valid nmsg_io_t object. |
[in] | output_mode | nmsg_io_output_mode_stripe or nmsg_io_output_mode_mirror. |