24 static void free_wrapper(
void *,
void *);
30 _output_nmsg_flush(nmsg_output_t output) {
33 pthread_mutex_lock(&output->stream->lock);
35 res = _output_nmsg_write_container(output);
36 if (output->stream->rate != NULL)
39 pthread_mutex_unlock(&output->stream->lock);
45 _output_nmsg_write(nmsg_output_t output, nmsg_message_t msg) {
46 Nmsg__NmsgPayload *np;
48 bool did_write =
false;
50 assert(msg->np != NULL);
54 if (output->stream->source != 0) {
55 np->source = output->stream->source;
58 if (output->stream->operator != 0) {
59 np->operator_ = output->stream->operator;
60 np->has_operator_ = 1;
62 if (output->stream->group != 0) {
63 np->group = output->stream->group;
67 pthread_mutex_lock(&output->stream->lock);
71 if (res == nmsg_res_container_full) {
72 res = _output_nmsg_write_container(output);
76 if (res == nmsg_res_container_overfull)
77 res = _output_frag_write(output);
80 res = _output_nmsg_write_container(output);
82 }
else if (res == nmsg_res_container_overfull) {
83 res = _output_frag_write(output);
88 if (did_write && output->stream->rate != NULL)
90 pthread_mutex_unlock(&output->stream->lock);
96 _output_nmsg_write_container(nmsg_output_t output) {
105 output->stream->do_zlib,
106 output->stream->sequence,
107 output->stream->sequence_id
109 if (output->stream->do_sequence)
110 output->stream->sequence += 1;
115 if (output->stream->type == nmsg_stream_type_sock) {
116 res = _output_nmsg_write_sock(output, buf, buf_len);
117 }
else if (output->stream->type == nmsg_stream_type_file) {
118 res = _output_nmsg_write_file(output, buf, buf_len);
119 }
else if (output->stream->type == nmsg_stream_type_xs) {
121 res = _output_nmsg_write_xs(output, buf, buf_len);
123 assert(output->stream->type != nmsg_stream_type_xs);
132 if (output->stream->c == NULL)
139 _output_nmsg_write_sock(nmsg_output_t output, uint8_t *buf,
size_t len) {
140 ssize_t bytes_written;
142 bytes_written = write(output->stream->fd, buf, len);
143 if (bytes_written < 0) {
144 _nmsg_dprintf(1,
"%s: write() failed: %s\n", __func__, strerror(errno));
146 return (nmsg_res_errno);
149 assert((
size_t) bytes_written == len);
155 _output_nmsg_write_xs(nmsg_output_t output, uint8_t *buf,
size_t len) {
159 if (xs_msg_init_data(&xmsg, buf, len, free_wrapper, NULL)) {
166 xs_pollitem_t xitems[1];
167 xitems[0].socket = output->stream->xs;
168 xitems[0].events = XS_POLLOUT;
171 ret = xs_sendmsg(output->stream->xs, &xmsg, 0);
176 _nmsg_dprintf(1,
"%s: xs_sendmsg() failed: %s\n",
177 __func__, strerror(errno));
193 _output_nmsg_write_file(nmsg_output_t output, uint8_t *buf,
size_t len) {
194 ssize_t bytes_written;
195 const uint8_t *ptr = buf;
198 bytes_written = write(output->stream->fd, ptr, len);
199 if (bytes_written < 0 && errno == EINTR)
201 if (bytes_written < 0) {
202 _nmsg_dprintf(1,
"%s: write() failed: %s\n", __func__, strerror(errno));
204 return (nmsg_res_errno);
206 ptr += bytes_written;
207 len -= bytes_written;
217 free_wrapper(
void *ptr,
void *hint __attribute__((unused))) {
nmsg_res nmsg_container_serialize(nmsg_container_t c, uint8_t **pbuf, size_t *buf_len, bool do_header, bool do_zlib, uint32_t sequence, uint64_t sequence_id)
Serialize an NMSG container object, allocating memory as needed and returning a free()able buffer con...
#define NMSG_RBUF_TIMEOUT
Number of milliseconds to wait for data on an nmsg socket before returning nmsg_res_again.
void nmsg_rate_sleep(nmsg_rate_t r)
Sleep if necessary to maintain the target rate limit.
void nmsg_container_set_sequence(nmsg_container_t c, bool do_sequence)
Enable or disable NMSG container sequence tracking.
nmsg_res nmsg_container_add(nmsg_container_t c, nmsg_message_t msg)
Add an NMSG message object to an NMSG container object.
void nmsg_container_destroy(nmsg_container_t *c)
Deallocate the resources associated with an nmsg_container_t object.
size_t nmsg_container_get_num_payloads(nmsg_container_t c)
Get the current number of payloads in the NMSG container object.
nmsg_container_t nmsg_container_init(size_t bufsz)
Initialize a new NMSG container object.