nmsg  0.9.0
output.c
1 /*
2  * Copyright (c) 2008-2013 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 /* Forward. */
22 
23 static nmsg_output_t output_open_stream(nmsg_stream_type, int, size_t);
24 static nmsg_output_t output_open_stream_base(nmsg_stream_type, size_t);
25 static nmsg_res output_write_callback(nmsg_output_t, nmsg_message_t);
26 
27 /* Export. */
28 
29 nmsg_output_t
30 nmsg_output_open_file(int fd, size_t bufsz) {
31  return (output_open_stream(nmsg_stream_type_file, fd, bufsz));
32 }
33 
34 nmsg_output_t
35 nmsg_output_open_sock(int fd, size_t bufsz) {
36  return (output_open_stream(nmsg_stream_type_sock, fd, bufsz));
37 }
38 
39 #ifdef HAVE_LIBXS
40 nmsg_output_t
41 nmsg_output_open_xs(void *s, size_t bufsz) {
42  struct nmsg_output *output;
43 
44  output = output_open_stream_base(nmsg_stream_type_xs, bufsz);
45  if (output == NULL)
46  return (output);
47 
48  output->stream->xs = s;
49 
50  return (output);
51 }
52 #else /* HAVE_LIBXS */
53 nmsg_output_t
54 nmsg_output_open_xs(void *s __attribute__((unused)),
55  size_t bufsz __attribute__((unused)))
56 {
57  return (NULL);
58 }
59 #endif /* HAVE_LIBXS */
60 
61 nmsg_output_t
63  struct nmsg_output *output;
64 
65  output = calloc(1, sizeof(*output));
66  if (output == NULL)
67  return (NULL);
68  output->type = nmsg_output_type_pres;
69  output->write_fp = _output_pres_write;
70 
71  output->pres = calloc(1, sizeof(*(output->pres)));
72  if (output->pres == NULL) {
73  free(output);
74  return (NULL);
75  }
76  output->pres->fp = fdopen(fd, "w");
77  if (output->pres->fp == NULL) {
78  free(output->pres);
79  free(output);
80  return (NULL);
81  }
82  output->pres->endline = strdup("\n");
83  pthread_mutex_init(&output->pres->lock, NULL);
84 
85  return (output);
86 }
87 
88 nmsg_output_t
90  struct nmsg_output *output;
91 
92  output = calloc(1, sizeof(*output));
93  if (output == NULL)
94  return (NULL);
95  output->type = nmsg_output_type_callback;
96  output->write_fp = output_write_callback;
97 
98  output->callback = calloc(1, sizeof(*(output->callback)));
99  if (output->callback == NULL) {
100  free(output);
101  return (NULL);
102  }
103  output->callback->cb = cb;
104  output->callback->user = user;
105 
106  return (output);
107 }
108 
109 nmsg_res
110 nmsg_output_flush(nmsg_output_t output) {
111  return (output->flush_fp(output));
112 }
113 
114 nmsg_res
115 nmsg_output_write(nmsg_output_t output, nmsg_message_t msg) {
116  nmsg_res res;
117 
118  res = _nmsg_message_serialize(msg);
119  if (res != nmsg_res_success)
120  return (res);
121 
122  if (output->do_filter == true &&
123  (output->filter_vid != msg->np->vid ||
124  output->filter_msgtype != msg->np->msgtype))
125  {
126  return (nmsg_res_success);
127  }
128 
129  res = output->write_fp(output, msg);
130  return (res);
131 }
132 
133 nmsg_res
134 nmsg_output_close(nmsg_output_t *output) {
135  nmsg_res res;
136 
137  res = nmsg_res_success;
138  switch ((*output)->type) {
139  case nmsg_output_type_stream:
140  res = _output_nmsg_flush(*output);
141  if ((*output)->stream->random != NULL)
142  nmsg_random_destroy(&((*output)->stream->random));
143 #ifdef HAVE_LIBXS
144  if ((*output)->stream->type == nmsg_stream_type_xs)
145  xs_close((*output)->stream->xs);
146 #else /* HAVE_LIBXS */
147  assert((*output)->stream->type != nmsg_stream_type_xs);
148 #endif /* HAVE_LIBXS */
149  if ((*output)->stream->type == nmsg_stream_type_file ||
150  (*output)->stream->type == nmsg_stream_type_sock)
151  {
152  if (_nmsg_global_autoclose)
153  close((*output)->stream->fd);
154  }
155  nmsg_container_destroy(&(*output)->stream->c);
156  free((*output)->stream);
157  break;
158  case nmsg_output_type_pres:
159  fclose((*output)->pres->fp);
160  free((*output)->pres->endline);
161  free((*output)->pres);
162  break;
163  case nmsg_output_type_callback:
164  free((*output)->callback);
165  break;
166  }
167  free(*output);
168  *output = NULL;
169  return (res);
170 }
171 
172 void
173 nmsg_output_set_buffered(nmsg_output_t output, bool buffered) {
174  if (output->type == nmsg_output_type_stream) {
175  output->stream->buffered = buffered;
176  } else if (output->type == nmsg_output_type_pres) {
177  output->pres->flush = !(buffered);
178  }
179 }
180 
181 void
182 nmsg_output_set_filter_msgtype(nmsg_output_t output, unsigned vid, unsigned msgtype) {
183  if (vid == 0 && msgtype == 0)
184  output->do_filter = false;
185  else
186  output->do_filter = true;
187 
188  output->filter_vid = vid;
189  output->filter_msgtype = msgtype;
190 }
191 
192 nmsg_res
194  const char *vname, const char *mname)
195 {
196  unsigned vid, msgtype;
197 
198  if (vname == NULL || mname == NULL)
199  return (nmsg_res_failure);
200 
201  vid = nmsg_msgmod_vname_to_vid(vname);
202  if (vid == 0)
203  return (nmsg_res_failure);
204  msgtype = nmsg_msgmod_mname_to_msgtype(vid, mname);
205  if (msgtype == 0)
206  return (nmsg_res_failure);
207 
208  nmsg_output_set_filter_msgtype(output, vid, msgtype);
209 
210  return (nmsg_res_success);
211 }
212 
213 void
214 nmsg_output_set_rate(nmsg_output_t output, nmsg_rate_t rate) {
215  if (output->type != nmsg_output_type_stream)
216  return;
217  if (output->stream->rate != NULL)
218  nmsg_rate_destroy(&output->stream->rate);
219  output->stream->rate = rate;
220 }
221 
222 void
223 nmsg_output_set_zlibout(nmsg_output_t output, bool zlibout) {
224  if (output->type != nmsg_output_type_stream)
225  return;
226  output->stream->do_zlib = zlibout;
227 }
228 
229 void
230 nmsg_output_set_endline(nmsg_output_t output, const char *endline) {
231  if (output->type == nmsg_output_type_pres) {
232  if (output->pres->endline != NULL)
233  free(output->pres->endline);
234  output->pres->endline = strdup(endline);
235  }
236 }
237 
238 void
239 nmsg_output_set_source(nmsg_output_t output, unsigned source) {
240  if (output->type == nmsg_output_type_stream)
241  output->stream->source = source;
242 }
243 
244 void
245 nmsg_output_set_operator(nmsg_output_t output, unsigned operator) {
246  if (output->type == nmsg_output_type_stream)
247  output->stream->operator = operator;
248 }
249 
250 void
251 nmsg_output_set_group(nmsg_output_t output, unsigned group) {
252  if (output->type == nmsg_output_type_stream)
253  output->stream->group = group;
254 }
255 
256 void
257 _output_stop(nmsg_output_t output) {
258  output->stop = true;
259 }
260 
261 /* Private functions. */
262 
263 static nmsg_output_t
264 output_open_stream(nmsg_stream_type type, int fd, size_t bufsz) {
265  struct nmsg_output *output;
266 
267  output = output_open_stream_base(type, bufsz);
268  if (output == NULL)
269  return (output);
270 
271  /* fd */
272  if (type == nmsg_stream_type_file ||
273  type == nmsg_stream_type_sock)
274  {
275  output->stream->fd = fd;
276  }
277 
278  return (output);
279 }
280 
281 static nmsg_output_t
282 output_open_stream_base(nmsg_stream_type type, size_t bufsz) {
283  struct nmsg_output *output;
284 
285  /* nmsg_output */
286  output = calloc(1, sizeof(*output));
287  if (output == NULL)
288  return (NULL);
289  output->type = nmsg_output_type_stream;
290  output->write_fp = _output_nmsg_write;
291  output->flush_fp = _output_nmsg_flush;
292 
293  /* nmsg_stream_output */
294  output->stream = calloc(1, sizeof(*(output->stream)));
295  if (output->stream == NULL) {
296  free(output);
297  return (NULL);
298  }
299  pthread_mutex_init(&output->stream->lock, NULL);
300  output->stream->type = type;
301  output->stream->buffered = true;
302 
303  /* seed the rng, needed for fragment and sequence IDs */
304  output->stream->random = nmsg_random_init();
305  if (output->stream->random == NULL) {
306  free(output->stream);
307  free(output);
308  return (NULL);
309  }
310 
311  /* enable container sequencing */
312  if (output->stream->type == nmsg_stream_type_sock ||
313  output->stream->type == nmsg_stream_type_xs)
314  {
315  output->stream->do_sequence = true;
316 
317  /* generate sequence ID */
318  nmsg_random_buf(output->stream->random,
319  (uint8_t *) &output->stream->sequence_id,
320  sizeof(output->stream->sequence_id));
321  }
322 
323  /* bufsz */
324  if (bufsz < NMSG_WBUFSZ_MIN)
325  bufsz = NMSG_WBUFSZ_MIN;
326  if (bufsz > NMSG_WBUFSZ_MAX)
327  bufsz = NMSG_WBUFSZ_MAX;
328  output->stream->bufsz = bufsz;
329 
330  /* nmsg container */
331  output->stream->c = nmsg_container_init(bufsz);
332  if (output->stream->c == NULL) {
333  nmsg_random_destroy(&output->stream->random);
334  free(output->stream);
335  free(output);
336  return (NULL);
337  }
338  nmsg_container_set_sequence(output->stream->c, output->stream->do_sequence);
339 
340  return (output);
341 }
342 
343 static nmsg_res
344 output_write_callback(nmsg_output_t output, nmsg_message_t msg) {
345  output->callback->cb(msg, output->callback->user);
346  return (nmsg_res_success);
347 }
nmsg_output_t nmsg_output_open_pres(int fd)
Initialize a new presentation format (ASCII lines) nmsg output.
Definition: output.c:62
nmsg_res
nmsg result code
Definition: res.h:25
success
Definition: res.h:26
void nmsg_output_set_rate(nmsg_output_t output, nmsg_rate_t rate)
Limit the payload output rate.
Definition: output.c:214
unsigned nmsg_msgmod_vname_to_vid(const char *vname)
Convert a human-readable vendor name to its numeric ID.
void nmsg_output_set_filter_msgtype(nmsg_output_t output, unsigned vid, unsigned msgtype)
Filter an nmsg_output_t for a given vendor ID / message type.
Definition: output.c:182
nmsg_output_t nmsg_output_open_xs(void *s, size_t bufsz)
Initialize a new XS socket NMSG output.
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
Definition: nmsg.h:69
void nmsg_output_set_zlibout(nmsg_output_t output, bool zlibout)
Enable or disable zlib compression of output NMSG containers.
Definition: output.c:223
void nmsg_output_set_source(nmsg_output_t output, unsigned source)
Set the 'source' field on all output NMSG payloads.
Definition: output.c:239
#define NMSG_WBUFSZ_MIN
Minimum number of octets that an nmsg wbuf must hold.
Definition: constants.h:62
nmsg_res nmsg_output_write(nmsg_output_t output, nmsg_message_t msg)
Write an nmsg message to an nmsg_output_t object.
Definition: output.c:115
nmsg_output_t nmsg_output_open_callback(nmsg_cb_message cb, void *user)
Initialize a new nmsg output closure.
Definition: output.c:89
nmsg_res nmsg_output_set_filter_msgtype_byname(nmsg_output_t output, const char *vname, const char *mname)
Filter an nmsg_output_t for a given vendor ID / message type.
Definition: output.c:193
nmsg_output_t nmsg_output_open_sock(int fd, size_t bufsz)
Initialize a new datagram socket nmsg output.
Definition: output.c:35
void nmsg_container_set_sequence(nmsg_container_t c, bool do_sequence)
Enable or disable NMSG container sequence tracking.
void nmsg_output_set_group(nmsg_output_t output, unsigned group)
Set the 'group' field on all output NMSG payloads.
Definition: output.c:251
void nmsg_output_set_buffered(nmsg_output_t output, bool buffered)
Make an nmsg_output_t socket output buffered or unbuffered.
Definition: output.c:173
generic failure
Definition: res.h:27
void nmsg_rate_destroy(nmsg_rate_t *r)
Destroy an nmsg_rate_t object.
Definition: rate.c:52
#define NMSG_WBUFSZ_MAX
Maximum number of octets that an nmsg wbuf can hold.
Definition: constants.h:67
void nmsg_container_destroy(nmsg_container_t *c)
Deallocate the resources associated with an nmsg_container_t object.
unsigned nmsg_msgmod_mname_to_msgtype(unsigned vid, const char *mname)
Convert the human-readable name of a message type to a message type ID.
void nmsg_output_set_endline(nmsg_output_t output, const char *endline)
Set the line continuation string for presentation format output.
Definition: output.c:230
void nmsg_output_set_operator(nmsg_output_t output, unsigned operator_)
Set the 'operator' field on all output NMSG payloads.
Definition: output.c:245
nmsg_res nmsg_output_flush(nmsg_output_t output)
Flush an nmsg_output_t object.
Definition: output.c:110
nmsg_container_t nmsg_container_init(size_t bufsz)
Initialize a new NMSG container object.
Definition: container.c:33
nmsg_res nmsg_output_close(nmsg_output_t *output)
Close an nmsg_output_t object.
Definition: output.c:134
nmsg_output_t nmsg_output_open_file(int fd, size_t bufsz)
Initialize a new byte-stream nmsg output.
Definition: output.c:30