nmsg  0.9.0
Data Structures | Typedefs | Enumerations | Functions
io.h File Reference

Multi-threaded nmsg I/O processing. More...

Go to the source code of this file.

Data Structures

struct  nmsg_io_close_event
 Structure for passing information about a close event between the nmsg_io processing loop and the original caller. More...
 

Typedefs

typedef void(* nmsg_io_close_fp )(struct nmsg_io_close_event *ce)
 Function for handling close event notifications. More...
 
typedef void(* nmsg_io_user_fp )(unsigned threadno, void *user)
 Optional user-specified function to be run at thread start or thread stop.
 

Enumerations

enum  nmsg_io_close_type {
  nmsg_io_close_type_eof,
  nmsg_io_close_type_count,
  nmsg_io_close_type_interval
}
 Type of a close event notification. More...
 
enum  nmsg_io_io_type {
  nmsg_io_io_type_input,
  nmsg_io_io_type_output
}
 Type of the stream associated with a close event. More...
 
enum  nmsg_io_output_mode {
  nmsg_io_output_mode_stripe,
  nmsg_io_output_mode_mirror
}
 Output behavior when multiple outputs are present. More...
 

Functions

nmsg_io_t nmsg_io_init (void)
 Initialize a new nmsg_io_t object. More...
 
nmsg_res nmsg_io_add_input (nmsg_io_t io, nmsg_input_t input, void *user)
 Add an nmsg input to an nmsg_io_t object. More...
 
nmsg_res nmsg_io_add_input_channel (nmsg_io_t io, const char *chan, void *user)
 Add an nmsg input channel to an nmsg_io_t object. More...
 
nmsg_res nmsg_io_add_input_xs_channel (nmsg_io_t io, void *xs_ctx, const char *chan, void *user)
 Add an nmsg XS input channel to an nmsg_io_t object. More...
 
nmsg_res nmsg_io_add_input_sockspec (nmsg_io_t io, const char *sockspec, void *user)
 Add an nmsg input sockspec to an nmsg_io_t object. More...
 
nmsg_res nmsg_io_add_input_fname (nmsg_io_t io, const char *fname, void *user)
 Add an NMSG file to an nmsg_io_t object. More...
 
nmsg_res nmsg_io_add_output (nmsg_io_t io, nmsg_output_t output, void *user)
 Add an nmsg output to an nmsg_io_t object. More...
 
nmsg_res nmsg_io_loop (nmsg_io_t io)
 Begin processing the data specified by the configured inputs and outputs. More...
 
void nmsg_io_breakloop (nmsg_io_t io)
 Force a currently executing nmsg_io_loop() to stop looping and return. More...
 
void nmsg_io_destroy (nmsg_io_t *io)
 Deallocate the resources associated with an nmsg_io_t object. More...
 
unsigned nmsg_io_get_num_inputs (nmsg_io_t io)
 Get the number of inputs bound to the nmsg_io_t object. More...
 
unsigned nmsg_io_get_num_outputs (nmsg_io_t io)
 Get the number of outputs bound to the nmsg_io_t object. More...
 
void nmsg_io_set_close_fp (nmsg_io_t io, nmsg_io_close_fp close_fp)
 Set the close event notification function associated with an nmsg_io_t object. More...
 
void nmsg_io_set_atstart_fp (nmsg_io_t io, nmsg_io_user_fp user_fp, void *user)
 Set a user-specified function to be called in each thread after the thread starts. More...
 
void nmsg_io_set_atexit_fp (nmsg_io_t io, nmsg_io_user_fp user_fp, void *user)
 Set a user-specified function to be called in each thread before the thread exits. More...
 
void nmsg_io_set_count (nmsg_io_t io, unsigned count)
 Configure the nmsg_io_t object to close inputs after processing a certain non-zero number of payloads. More...
 
void nmsg_io_set_debug (nmsg_io_t io, int debug)
 Set the debug level for an nmsg_io_t object. More...
 
void nmsg_io_set_interval (nmsg_io_t io, unsigned interval)
 Configure the nmsg_io_t object to close inputs after processing for a set amount of time. More...
 
void nmsg_io_set_output_mode (nmsg_io_t io, nmsg_io_output_mode output_mode)
 Set the output mode behavior for an nmsg_io_t object. More...
 

Detailed Description

Multi-threaded nmsg I/O processing.

nmsg_io_t objects handle the multiplexing of NMSG data between nmsg_input_t and nmsg_output_t objects. Callers should initialize at least one input and at least one output and add them to an nmsg_io_t object before calling nmsg_io_loop().

Striping and mirroring of input payloads to individual outputs is supported. Striping is the default mode. Mirroring imposes the overhead of a per-output copy for each input payload.

MP:

Definition in file io.h.

Typedef Documentation

typedef void(* nmsg_io_close_fp)(struct nmsg_io_close_event *ce)

Function for handling close event notifications.

Parameters
[in,out]ceClose event

Definition at line 125 of file io.h.

Enumeration Type Documentation

Type of a close event notification.

Enumerator
nmsg_io_close_type_eof 

end of file

nmsg_io_close_type_count 

payload count reached

nmsg_io_close_type_interval 

interval elapsed

Definition at line 48 of file io.h.

Type of the stream associated with a close event.

Enumerator
nmsg_io_io_type_input 

close event input

nmsg_io_io_type_output 

close event output

Definition at line 57 of file io.h.

Output behavior when multiple outputs are present.

Enumerator
nmsg_io_output_mode_stripe 

stripe payloads across output

nmsg_io_output_mode_mirror 

mirror payloads across output

Definition at line 65 of file io.h.

Function Documentation

nmsg_io_t nmsg_io_init ( void  )

Initialize a new nmsg_io_t object.

Returns
Opaque pointer that is NULL on failure or non-NULL on success.

Definition at line 94 of file io.c.

nmsg_res nmsg_io_add_input ( nmsg_io_t  io,
nmsg_input_t  input,
void *  user 
)

Add an nmsg input to an nmsg_io_t object.

When nmsg_io_loop() is called, one thread will be created for each input to process input payloads.

Parameters
[in]ioValid nmsg_io_t object.
[in]inputValid nmsg_input_t object.
[in]userNULL or an input-specific user pointer.
Returns
nmsg_res_success
nmsg_res_memfail

Definition at line 240 of file io.c.

nmsg_res nmsg_io_add_input_channel ( nmsg_io_t  io,
const char *  chan,
void *  user 
)

Add an nmsg input channel to an nmsg_io_t object.

When nmsg_io_loop() is called, one thread will be created for each input socket constituting the channel to process input payloads.

"Channels" are specified in the channel alias file, which is usually a file named "nmsg.chalias" in the sysconfdir.

Parameters
[in]ioValid nmsg_io_t object.
[in]chanInput channel name.
[in]userNULL or an input-specific user pointer.
Returns
nmsg_res_success
nmsg_res_parse_error
nmsg_res_memfail

Definition at line 348 of file io.c.

nmsg_res nmsg_io_add_input_xs_channel ( nmsg_io_t  io,
void *  xs_ctx,
const char *  chan,
void *  user 
)

Add an nmsg XS input channel to an nmsg_io_t object.

Parameters
[in]ioValid nmsg_io_t object.
[in]xs_ctxXS context object.
[in]chanInput channel name.
[in]userNULL or an input-specific user pointer.
Returns
nmsg_res_success
nmsg_res_parse_error
nmsg_res_memfail
nmsg_res nmsg_io_add_input_sockspec ( nmsg_io_t  io,
const char *  sockspec,
void *  user 
)

Add an nmsg input sockspec to an nmsg_io_t object.

When nmsg_io_loop() is called, one thread will be created for each input socket constituting the sockspec to process input payloads.

Sockspecs are strings in the form "<ADDRESS>/<PORTRANGE>" where <ADDRESS> is an IPv4 or IPv6 address, and <PORTRANGE> is either a single port or a contiguous, inclusive range of ports of the form "<PORT_START>..<PORT_END>".

Parameters
[in]ioValid nmsg_io_t object.
[in]sockInput channel.
[in]userNULL or an input-specific user pointer.
Returns
nmsg_res_success
nmsg_res_parse_error
nmsg_res_memfail

Definition at line 437 of file io.c.

nmsg_res nmsg_io_add_input_fname ( nmsg_io_t  io,
const char *  fname,
void *  user 
)

Add an NMSG file to an nmsg_io_t object.

Parameters
[in]ioValid nmsg_io_t object.
[in]fnameName of NMSG file.
[in]userNULL or an input-specific user pointer.
Returns
nmsg_res_success
nmsg_res_failure
nmsg_res_memfail

Definition at line 461 of file io.c.

nmsg_res nmsg_io_add_output ( nmsg_io_t  io,
nmsg_output_t  output,
void *  user 
)

Add an nmsg output to an nmsg_io_t object.

When nmsg_io_loop() is called, the input threads will cycle over and write payloads to the available outputs.

Parameters
[in]ioValid nmsg_io_t object.
[in]outputValid nmsg_output_t object.
[in]userNULL or an output-specific user pointer.
Returns
nmsg_res_success
nmsg_res_memfail

Definition at line 265 of file io.c.

nmsg_res nmsg_io_loop ( nmsg_io_t  io)

Begin processing the data specified by the configured inputs and outputs.

One processing thread is created for each input. nmsg_io_loop() does not return until these threads finish and are destroyed.

Only nmsg_io_breakloop() may be called asynchronously while nmsg_io_loop() is executing.

nmsg_io_loop() invalidates an nmsg_io_t object. nmsg_io_destroy() should then be called.

Parameters
[in]iovalid nmsg_io_t object.
Returns
nmsg_res_success
nmsg_res_failure

Definition at line 127 of file io.c.

void nmsg_io_breakloop ( nmsg_io_t  io)

Force a currently executing nmsg_io_loop() to stop looping and return.

Since nmsg_io_loop() is a blocking call, nmsg_io_breakloop() must be called asynchronously.

This function is safe to call inside a signal handler.

Parameters
[in]ioValid and currently processing nmsg_io_t object.

Definition at line 108 of file io.c.

void nmsg_io_destroy ( nmsg_io_t *  io)

Deallocate the resources associated with an nmsg_io_t object.

Parameters
[in]ioPointer to an nmsg_io_t object.

Definition at line 179 of file io.c.

unsigned nmsg_io_get_num_inputs ( nmsg_io_t  io)

Get the number of inputs bound to the nmsg_io_t object.

Returns
Number of inputs.

Definition at line 482 of file io.c.

unsigned nmsg_io_get_num_outputs ( nmsg_io_t  io)

Get the number of outputs bound to the nmsg_io_t object.

Returns
Number of outputs.

Definition at line 487 of file io.c.

void nmsg_io_set_close_fp ( nmsg_io_t  io,
nmsg_io_close_fp  close_fp 
)

Set the close event notification function associated with an nmsg_io_t object.

The provided function will only be called at EOF on an input or output stream, unless nmsg_io_set_count() or nmsg_io_set_interval() are used to specify conditions when an input stream should be closed.

Parameters
[in]ioValid nmsg_io_t object.
[in]close_fpClose event notification function. It must be reentrant.

Definition at line 492 of file io.c.

void nmsg_io_set_atstart_fp ( nmsg_io_t  io,
nmsg_io_user_fp  user_fp,
void *  user 
)

Set a user-specified function to be called in each thread after the thread starts.

Parameters
[in]ioValid nmsg_io_t object.
[in]user_fpUser-specified function.
[in]userUser pointer to be passed to user function.

Definition at line 497 of file io.c.

void nmsg_io_set_atexit_fp ( nmsg_io_t  io,
nmsg_io_user_fp  user_fp,
void *  user 
)

Set a user-specified function to be called in each thread before the thread exits.

Parameters
[in]ioValid nmsg_io_t object.
[in]user_fpUser-specified function.
[in]userUser pointer to be passed to user function.

Definition at line 503 of file io.c.

void nmsg_io_set_count ( nmsg_io_t  io,
unsigned  count 
)

Configure the nmsg_io_t object to close inputs after processing a certain non-zero number of payloads.

If the 'user' pointer associated with an output stream is non-NULL the close event notification function must be set, and this function must reopen the stream. If the 'user' pointer is NULL, nmsg_io processing will be shut down.

Parameters
[in]ioValid nmsg_io_t object.
[in]countInteger > 0.

Definition at line 509 of file io.c.

void nmsg_io_set_debug ( nmsg_io_t  io,
int  debug 
)

Set the debug level for an nmsg_io_t object.

Debug levels >= 0 will cause debugging information to be logged to stderr.

Parameters
[in]ioValid nmsg_io_t object.
[in]debugDebug level.

Definition at line 514 of file io.c.

void nmsg_io_set_interval ( nmsg_io_t  io,
unsigned  interval 
)

Configure the nmsg_io_t object to close inputs after processing for a set amount of time.

If the 'user' pointer associated with an output stream is non-NULL the close event notification function must be set, and this function must reopen the stream. If the 'user' pointer is NULL, nmsg_io processing will be shut down.

Parameters
[in]ioValid nmsg_io_t object.
[in]intervalPositive number of seconds.

Definition at line 519 of file io.c.

void nmsg_io_set_output_mode ( nmsg_io_t  io,
nmsg_io_output_mode  output_mode 
)

Set the output mode behavior for an nmsg_io_t object.

Nmsg payloads received from inputs may be striped across available outputs (the default), or mirrored across all available outputs.

Since nmsg_io must synchronize access to individual outputs, the mirrored output mode will limit the amount of parallelism that can be achieved.

Parameters
[in]ioValid nmsg_io_t object.
[in]output_modenmsg_io_output_mode_stripe or nmsg_io_output_mode_mirror.

Definition at line 524 of file io.c.