nmsg  0.9.0
output_nmsg.c
1 /*
2  * Copyright (c) 2008-2013 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 /* Forward. */
22 
23 #ifdef HAVE_LIBXS
24 static void free_wrapper(void *, void *);
25 #endif
26 
27 /* Internal functions. */
28 
30 _output_nmsg_flush(nmsg_output_t output) {
32 
33  pthread_mutex_lock(&output->stream->lock);
34  if (nmsg_container_get_num_payloads(output->stream->c) > 0) {
35  res = _output_nmsg_write_container(output);
36  if (output->stream->rate != NULL)
37  nmsg_rate_sleep(output->stream->rate);
38  }
39  pthread_mutex_unlock(&output->stream->lock);
40 
41  return (res);
42 }
43 
45 _output_nmsg_write(nmsg_output_t output, nmsg_message_t msg) {
46  Nmsg__NmsgPayload *np;
47  nmsg_res res;
48  bool did_write = false;
49 
50  assert(msg->np != NULL);
51  np = msg->np;
52 
53  /* set source, output, group if necessary */
54  if (output->stream->source != 0) {
55  np->source = output->stream->source;
56  np->has_source = 1;
57  }
58  if (output->stream->operator != 0) {
59  np->operator_ = output->stream->operator;
60  np->has_operator_ = 1;
61  }
62  if (output->stream->group != 0) {
63  np->group = output->stream->group;
64  np->has_group = 1;
65  }
66 
67  pthread_mutex_lock(&output->stream->lock);
68 
69  res = nmsg_container_add(output->stream->c, msg);
70 
71  if (res == nmsg_res_container_full) {
72  res = _output_nmsg_write_container(output);
73  if (res != nmsg_res_success)
74  goto out;
75  res = nmsg_container_add(output->stream->c, msg);
76  if (res == nmsg_res_container_overfull)
77  res = _output_frag_write(output);
78  did_write = true;
79  } else if (res == nmsg_res_success && output->stream->buffered == false) {
80  res = _output_nmsg_write_container(output);
81  did_write = true;
82  } else if (res == nmsg_res_container_overfull) {
83  res = _output_frag_write(output);
84  did_write = true;
85  }
86 
87 out:
88  if (did_write && output->stream->rate != NULL)
89  nmsg_rate_sleep(output->stream->rate);
90  pthread_mutex_unlock(&output->stream->lock);
91 
92  return (res);
93 }
94 
96 _output_nmsg_write_container(nmsg_output_t output) {
97  nmsg_res res;
98  size_t buf_len;
99  uint8_t *buf;
100 
101  res = nmsg_container_serialize(output->stream->c,
102  &buf,
103  &buf_len,
104  true, /* do_header */
105  output->stream->do_zlib,
106  output->stream->sequence,
107  output->stream->sequence_id
108  );
109  if (output->stream->do_sequence)
110  output->stream->sequence += 1;
111 
112  if (res != nmsg_res_success)
113  goto out;
114 
115  if (output->stream->type == nmsg_stream_type_sock) {
116  res = _output_nmsg_write_sock(output, buf, buf_len);
117  } else if (output->stream->type == nmsg_stream_type_file) {
118  res = _output_nmsg_write_file(output, buf, buf_len);
119  } else if (output->stream->type == nmsg_stream_type_xs) {
120 #ifdef HAVE_LIBXS
121  res = _output_nmsg_write_xs(output, buf, buf_len);
122 #else /* HAVE_LIBXS */
123  assert(output->stream->type != nmsg_stream_type_xs);
124 #endif /* HAVE_LIBXS */
125  } else {
126  assert(0);
127  }
128 
129 out:
130  nmsg_container_destroy(&output->stream->c);
131  output->stream->c = nmsg_container_init(output->stream->bufsz);
132  if (output->stream->c == NULL)
133  return (nmsg_res_memfail);
134  nmsg_container_set_sequence(output->stream->c, output->stream->do_sequence);
135  return (res);
136 }
137 
138 nmsg_res
139 _output_nmsg_write_sock(nmsg_output_t output, uint8_t *buf, size_t len) {
140  ssize_t bytes_written;
141 
142  bytes_written = write(output->stream->fd, buf, len);
143  if (bytes_written < 0) {
144  _nmsg_dprintf(1, "%s: write() failed: %s\n", __func__, strerror(errno));
145  free(buf);
146  return (nmsg_res_errno);
147  }
148  free(buf);
149  assert((size_t) bytes_written == len);
150  return (nmsg_res_success);
151 }
152 
153 #ifdef HAVE_LIBXS
154 nmsg_res
155 _output_nmsg_write_xs(nmsg_output_t output, uint8_t *buf, size_t len) {
157  xs_msg_t xmsg;
158 
159  if (xs_msg_init_data(&xmsg, buf, len, free_wrapper, NULL)) {
160  free(buf);
161  return (nmsg_res_failure);
162  }
163 
164  for (;;) {
165  int ret;
166  xs_pollitem_t xitems[1];
167  xitems[0].socket = output->stream->xs;
168  xitems[0].events = XS_POLLOUT;
169  ret = xs_poll(xitems, 1, NMSG_RBUF_TIMEOUT);
170  if (ret > 0) {
171  ret = xs_sendmsg(output->stream->xs, &xmsg, 0);
172  if (ret > 0) {
173  break;
174  } else {
175  res = nmsg_res_failure;
176  _nmsg_dprintf(1, "%s: xs_sendmsg() failed: %s\n",
177  __func__, strerror(errno));
178  break;
179  }
180  }
181  if (output->stop) {
182  res = nmsg_res_stop;
183  break;
184  }
185  }
186 
187  xs_msg_close(&xmsg);
188  return (res);
189 }
190 #endif /* HAVE_LIBXS */
191 
192 nmsg_res
193 _output_nmsg_write_file(nmsg_output_t output, uint8_t *buf, size_t len) {
194  ssize_t bytes_written;
195  const uint8_t *ptr = buf;
196 
197  while (len) {
198  bytes_written = write(output->stream->fd, ptr, len);
199  if (bytes_written < 0 && errno == EINTR)
200  continue;
201  if (bytes_written < 0) {
202  _nmsg_dprintf(1, "%s: write() failed: %s\n", __func__, strerror(errno));
203  free(buf);
204  return (nmsg_res_errno);
205  }
206  ptr += bytes_written;
207  len -= bytes_written;
208  }
209  free(buf);
210  return (nmsg_res_success);
211 }
212 
213 /* Private functions. */
214 
215 #ifdef HAVE_LIBXS
216 static void
217 free_wrapper(void *ptr, void *hint __attribute__((unused))) {
218  free(ptr);
219 }
220 #endif
nmsg_res
nmsg result code
Definition: res.h:25
success
Definition: res.h:26
nmsg_res nmsg_container_serialize(nmsg_container_t c, uint8_t **pbuf, size_t *buf_len, bool do_header, bool do_zlib, uint32_t sequence, uint64_t sequence_id)
Serialize an NMSG container object, allocating memory as needed and returning a free()able buffer con...
#define NMSG_RBUF_TIMEOUT
Number of milliseconds to wait for data on an nmsg socket before returning nmsg_res_again.
Definition: constants.h:92
void nmsg_rate_sleep(nmsg_rate_t r)
Sleep if necessary to maintain the target rate limit.
Definition: rate.c:61
out of memory
Definition: res.h:29
void nmsg_container_set_sequence(nmsg_container_t c, bool do_sequence)
Enable or disable NMSG container sequence tracking.
processing should stop
Definition: res.h:34
generic failure
Definition: res.h:27
nmsg_res nmsg_container_add(nmsg_container_t c, nmsg_message_t msg)
Add an NMSG message object to an NMSG container object.
void nmsg_container_destroy(nmsg_container_t *c)
Deallocate the resources associated with an nmsg_container_t object.
size_t nmsg_container_get_num_payloads(nmsg_container_t c)
Get the current number of payloads in the NMSG container object.
nmsg_container_t nmsg_container_init(size_t bufsz)
Initialize a new NMSG container object.
Definition: container.c:33