nmsg  0.9.0
output_frag.c
1 /*
2  * Copyright (c) 2008-2013 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 /* Forward. */
22 static void header_serialize(uint8_t *buf, uint8_t flags, uint32_t len);
23 
24 /* Internal functions. */
25 
27 _output_frag_write(nmsg_output_t output) {
28  Nmsg__NmsgFragment nf;
29  int i;
30  nmsg_res res;
31  size_t len, fragpos, fragsz, fraglen, max_fragsz;
32  uint8_t flags = 0, *packed, *frag_packed, *frag_packed_container;
33 
34  assert(output->type == nmsg_output_type_stream);
35 
36 #ifdef HAVE_LIBXS
37  if (output->stream->type == nmsg_stream_type_xs) {
38  /* let XS do fragmentation instead */
39  return (_output_nmsg_write_container(output));
40  }
41 #else /* HAVE_LIBXS */
42  assert(output->stream->type != nmsg_stream_type_xs);
43 #endif /* HAVE_LIBXS */
44 
45  nmsg__nmsg_fragment__init(&nf);
46  max_fragsz = output->stream->bufsz - 32;
47 
48  res = nmsg_container_serialize(output->stream->c,
49  &packed,
50  &len,
51  false, /* do_header */
52  output->stream->do_zlib,
53  output->stream->sequence,
54  output->stream->sequence_id
55  );
56  if (output->stream->do_sequence)
57  output->stream->sequence += 1;
58 
59  if (output->stream->do_zlib)
60  flags |= NMSG_FLAG_ZLIB;
61 
62  if (res != nmsg_res_success)
63  return (res);
64 
65  if (output->stream->do_zlib && len <= max_fragsz) {
66  /* write out the unfragmented NMSG container */
67 
68  if (output->stream->type == nmsg_stream_type_sock) {
69  res = _output_nmsg_write_sock(output, packed, len);
70  } else if (output->stream->type == nmsg_stream_type_file) {
71  res = _output_nmsg_write_file(output, packed, len);
72  } else if (output->stream->type == nmsg_stream_type_xs) {
73  assert(0);
74  }
75  goto frag_out;
76  }
77 
78  /* create and send fragments */
79  flags |= NMSG_FLAG_FRAGMENT;
80  nf.id = nmsg_random_uint32(output->stream->random);
81  nf.last = len / max_fragsz;
82  nf.crc = htonl(my_crc32c(packed, len));
83  nf.has_crc = true;
84  for (fragpos = 0, i = 0;
85  fragpos < len;
86  fragpos += max_fragsz, i++)
87  {
88  /* allocate a buffer large enough to hold one serialized fragment */
89  frag_packed = malloc(NMSG_HDRLSZ_V2 + output->stream->bufsz + 32);
90  if (frag_packed == NULL) {
91  free(packed);
92  res = nmsg_res_memfail;
93  goto frag_out;
94  }
95  frag_packed_container = frag_packed + NMSG_HDRLSZ_V2;
96 
97  /* serialize the fragment */
98  nf.current = i;
99  fragsz = (len - fragpos > max_fragsz) ? max_fragsz : (len - fragpos);
100  nf.fragment.len = fragsz;
101  nf.fragment.data = packed + fragpos;
102  fraglen = nmsg__nmsg_fragment__pack(&nf, frag_packed_container);
103  header_serialize(frag_packed, flags, fraglen);
104  fraglen += NMSG_HDRLSZ_V2;
105 
106  /* send the serialized fragment */
107  if (output->stream->type == nmsg_stream_type_sock) {
108  res = _output_nmsg_write_sock(output, frag_packed, fraglen);
109  } else if (output->stream->type == nmsg_stream_type_file) {
110  res = _output_nmsg_write_file(output, frag_packed, fraglen);
111  } else {
112  assert(0);
113  }
114  }
115  free(packed);
116 
117 frag_out:
118  nmsg_container_destroy(&output->stream->c);
119  output->stream->c = nmsg_container_init(output->stream->bufsz);
120  if (output->stream->c == NULL)
121  return (nmsg_res_memfail);
122  nmsg_container_set_sequence(output->stream->c, output->stream->do_sequence);
123  return (res);
124 }
125 
126 static void
127 header_serialize(uint8_t *buf, uint8_t flags, uint32_t len) {
128  static const char magic[] = NMSG_MAGIC;
129  uint16_t version;
130 
131  memcpy(buf, magic, sizeof(magic));
132  buf += sizeof(magic);
133 
134  version = NMSG_VERSION | (flags << 8);
135  store_net16(buf, version);
136 
137  buf += sizeof(version);
138  store_net32(buf, len);
139 }
nmsg_res
nmsg result code
Definition: res.h:25
success
Definition: res.h:26
nmsg_res nmsg_container_serialize(nmsg_container_t c, uint8_t **pbuf, size_t *buf_len, bool do_header, bool do_zlib, uint32_t sequence, uint64_t sequence_id)
Serialize an NMSG container object, allocating memory as needed and returning a free()able buffer con...
out of memory
Definition: res.h:29
#define NMSG_MAGIC
Four-octet magic sequence seen at the beginning of a serialized NMSG.
Definition: constants.h:27
#define NMSG_HDRLSZ_V2
Number of octets in an NMSG header (magic + version + length).
Definition: constants.h:42
void nmsg_container_set_sequence(nmsg_container_t c, bool do_sequence)
Enable or disable NMSG container sequence tracking.
#define NMSG_FLAG_ZLIB
NMSG container is zlib compressed.
Definition: constants.h:109
void nmsg_container_destroy(nmsg_container_t *c)
Deallocate the resources associated with an nmsg_container_t object.
#define NMSG_FLAG_FRAGMENT
NMSG container is fragmented.
Definition: constants.h:114
nmsg_container_t nmsg_container_init(size_t bufsz)
Initialize a new NMSG container object.
Definition: container.c:33
#define NMSG_VERSION
Current version number of the NMSG serialization format.
Definition: constants.h:32