23 static nmsg_output_t output_open_stream(nmsg_stream_type,
int,
size_t);
24 static nmsg_output_t output_open_stream_base(nmsg_stream_type,
size_t);
25 static nmsg_res output_write_callback(nmsg_output_t, nmsg_message_t);
31 return (output_open_stream(nmsg_stream_type_file, fd, bufsz));
36 return (output_open_stream(nmsg_stream_type_sock, fd, bufsz));
44 output = output_open_stream_base(nmsg_stream_type_xs, bufsz);
48 output->stream->xs = s;
55 size_t bufsz __attribute__((unused)))
65 output = calloc(1,
sizeof(*output));
68 output->type = nmsg_output_type_pres;
69 output->write_fp = _output_pres_write;
71 output->pres = calloc(1,
sizeof(*(output->pres)));
72 if (output->pres == NULL) {
76 output->pres->fp = fdopen(fd,
"w");
77 if (output->pres->fp == NULL) {
82 output->pres->endline = strdup(
"\n");
83 pthread_mutex_init(&output->pres->lock, NULL);
92 output = calloc(1,
sizeof(*output));
95 output->type = nmsg_output_type_callback;
96 output->write_fp = output_write_callback;
98 output->callback = calloc(1,
sizeof(*(output->callback)));
99 if (output->callback == NULL) {
103 output->callback->cb = cb;
104 output->callback->user = user;
111 return (output->flush_fp(output));
118 res = _nmsg_message_serialize(msg);
122 if (output->do_filter ==
true &&
123 (output->filter_vid != msg->np->vid ||
124 output->filter_msgtype != msg->np->msgtype))
129 res = output->write_fp(output, msg);
138 switch ((*output)->type) {
139 case nmsg_output_type_stream:
140 res = _output_nmsg_flush(*output);
141 if ((*output)->stream->random != NULL)
142 nmsg_random_destroy(&((*output)->stream->random));
144 if ((*output)->stream->type == nmsg_stream_type_xs)
145 xs_close((*output)->stream->xs);
147 assert((*output)->stream->type != nmsg_stream_type_xs);
149 if ((*output)->stream->type == nmsg_stream_type_file ||
150 (*output)->stream->type == nmsg_stream_type_sock)
152 if (_nmsg_global_autoclose)
153 close((*output)->stream->fd);
156 free((*output)->stream);
158 case nmsg_output_type_pres:
159 fclose((*output)->pres->fp);
160 free((*output)->pres->endline);
161 free((*output)->pres);
163 case nmsg_output_type_callback:
164 free((*output)->callback);
174 if (output->type == nmsg_output_type_stream) {
175 output->stream->buffered = buffered;
176 }
else if (output->type == nmsg_output_type_pres) {
177 output->pres->flush = !(buffered);
183 if (vid == 0 && msgtype == 0)
184 output->do_filter =
false;
186 output->do_filter =
true;
188 output->filter_vid = vid;
189 output->filter_msgtype = msgtype;
194 const char *vname,
const char *mname)
196 unsigned vid, msgtype;
198 if (vname == NULL || mname == NULL)
215 if (output->type != nmsg_output_type_stream)
217 if (output->stream->rate != NULL)
219 output->stream->rate = rate;
224 if (output->type != nmsg_output_type_stream)
226 output->stream->do_zlib = zlibout;
231 if (output->type == nmsg_output_type_pres) {
232 if (output->pres->endline != NULL)
233 free(output->pres->endline);
234 output->pres->endline = strdup(endline);
240 if (output->type == nmsg_output_type_stream)
241 output->stream->source = source;
246 if (output->type == nmsg_output_type_stream)
247 output->stream->operator =
operator;
252 if (output->type == nmsg_output_type_stream)
253 output->stream->group = group;
257 _output_stop(nmsg_output_t output) {
264 output_open_stream(nmsg_stream_type type,
int fd,
size_t bufsz) {
267 output = output_open_stream_base(type, bufsz);
272 if (type == nmsg_stream_type_file ||
273 type == nmsg_stream_type_sock)
275 output->stream->fd = fd;
282 output_open_stream_base(nmsg_stream_type type,
size_t bufsz) {
286 output = calloc(1,
sizeof(*output));
289 output->type = nmsg_output_type_stream;
290 output->write_fp = _output_nmsg_write;
291 output->flush_fp = _output_nmsg_flush;
294 output->stream = calloc(1,
sizeof(*(output->stream)));
295 if (output->stream == NULL) {
299 pthread_mutex_init(&output->stream->lock, NULL);
300 output->stream->type = type;
301 output->stream->buffered =
true;
304 output->stream->random = nmsg_random_init();
305 if (output->stream->random == NULL) {
306 free(output->stream);
312 if (output->stream->type == nmsg_stream_type_sock ||
313 output->stream->type == nmsg_stream_type_xs)
315 output->stream->do_sequence =
true;
318 nmsg_random_buf(output->stream->random,
319 (uint8_t *) &output->stream->sequence_id,
320 sizeof(output->stream->sequence_id));
328 output->stream->bufsz = bufsz;
332 if (output->stream->c == NULL) {
333 nmsg_random_destroy(&output->stream->random);
334 free(output->stream);
344 output_write_callback(nmsg_output_t output, nmsg_message_t msg) {
345 output->callback->cb(msg, output->callback->user);
nmsg_output_t nmsg_output_open_pres(int fd)
Initialize a new presentation format (ASCII lines) nmsg output.
void nmsg_output_set_rate(nmsg_output_t output, nmsg_rate_t rate)
Limit the payload output rate.
unsigned nmsg_msgmod_vname_to_vid(const char *vname)
Convert a human-readable vendor name to its numeric ID.
void nmsg_output_set_filter_msgtype(nmsg_output_t output, unsigned vid, unsigned msgtype)
Filter an nmsg_output_t for a given vendor ID / message type.
nmsg_output_t nmsg_output_open_xs(void *s, size_t bufsz)
Initialize a new XS socket NMSG output.
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
void nmsg_output_set_zlibout(nmsg_output_t output, bool zlibout)
Enable or disable zlib compression of output NMSG containers.
void nmsg_output_set_source(nmsg_output_t output, unsigned source)
Set the 'source' field on all output NMSG payloads.
#define NMSG_WBUFSZ_MIN
Minimum number of octets that an nmsg wbuf must hold.
nmsg_res nmsg_output_write(nmsg_output_t output, nmsg_message_t msg)
Write an nmsg message to an nmsg_output_t object.
nmsg_output_t nmsg_output_open_callback(nmsg_cb_message cb, void *user)
Initialize a new nmsg output closure.
nmsg_res nmsg_output_set_filter_msgtype_byname(nmsg_output_t output, const char *vname, const char *mname)
Filter an nmsg_output_t for a given vendor ID / message type.
nmsg_output_t nmsg_output_open_sock(int fd, size_t bufsz)
Initialize a new datagram socket nmsg output.
void nmsg_container_set_sequence(nmsg_container_t c, bool do_sequence)
Enable or disable NMSG container sequence tracking.
void nmsg_output_set_group(nmsg_output_t output, unsigned group)
Set the 'group' field on all output NMSG payloads.
void nmsg_output_set_buffered(nmsg_output_t output, bool buffered)
Make an nmsg_output_t socket output buffered or unbuffered.
void nmsg_rate_destroy(nmsg_rate_t *r)
Destroy an nmsg_rate_t object.
#define NMSG_WBUFSZ_MAX
Maximum number of octets that an nmsg wbuf can hold.
void nmsg_container_destroy(nmsg_container_t *c)
Deallocate the resources associated with an nmsg_container_t object.
unsigned nmsg_msgmod_mname_to_msgtype(unsigned vid, const char *mname)
Convert the human-readable name of a message type to a message type ID.
void nmsg_output_set_endline(nmsg_output_t output, const char *endline)
Set the line continuation string for presentation format output.
void nmsg_output_set_operator(nmsg_output_t output, unsigned operator_)
Set the 'operator' field on all output NMSG payloads.
nmsg_res nmsg_output_flush(nmsg_output_t output)
Flush an nmsg_output_t object.
nmsg_container_t nmsg_container_init(size_t bufsz)
Initialize a new NMSG container object.
nmsg_res nmsg_output_close(nmsg_output_t *output)
Close an nmsg_output_t object.
nmsg_output_t nmsg_output_open_file(int fd, size_t bufsz)
Initialize a new byte-stream nmsg output.