22 static void header_serialize(uint8_t *buf, uint8_t flags, uint32_t len);
27 _output_frag_write(nmsg_output_t output) {
28 Nmsg__NmsgFragment nf;
31 size_t len, fragpos, fragsz, fraglen, max_fragsz;
32 uint8_t flags = 0, *packed, *frag_packed, *frag_packed_container;
34 assert(output->type == nmsg_output_type_stream);
37 if (output->stream->type == nmsg_stream_type_xs) {
39 return (_output_nmsg_write_container(output));
42 assert(output->stream->type != nmsg_stream_type_xs);
45 nmsg__nmsg_fragment__init(&nf);
46 max_fragsz = output->stream->bufsz - 32;
52 output->stream->do_zlib,
53 output->stream->sequence,
54 output->stream->sequence_id
56 if (output->stream->do_sequence)
57 output->stream->sequence += 1;
59 if (output->stream->do_zlib)
65 if (output->stream->do_zlib && len <= max_fragsz) {
68 if (output->stream->type == nmsg_stream_type_sock) {
69 res = _output_nmsg_write_sock(output, packed, len);
70 }
else if (output->stream->type == nmsg_stream_type_file) {
71 res = _output_nmsg_write_file(output, packed, len);
72 }
else if (output->stream->type == nmsg_stream_type_xs) {
80 nf.id = nmsg_random_uint32(output->stream->random);
81 nf.last = len / max_fragsz;
82 nf.crc = htonl(my_crc32c(packed, len));
84 for (fragpos = 0, i = 0;
86 fragpos += max_fragsz, i++)
90 if (frag_packed == NULL) {
99 fragsz = (len - fragpos > max_fragsz) ? max_fragsz : (len - fragpos);
100 nf.fragment.len = fragsz;
101 nf.fragment.data = packed + fragpos;
102 fraglen = nmsg__nmsg_fragment__pack(&nf, frag_packed_container);
103 header_serialize(frag_packed, flags, fraglen);
107 if (output->stream->type == nmsg_stream_type_sock) {
108 res = _output_nmsg_write_sock(output, frag_packed, fraglen);
109 }
else if (output->stream->type == nmsg_stream_type_file) {
110 res = _output_nmsg_write_file(output, frag_packed, fraglen);
120 if (output->stream->c == NULL)
127 header_serialize(uint8_t *buf, uint8_t flags, uint32_t len) {
131 memcpy(buf, magic,
sizeof(magic));
132 buf +=
sizeof(magic);
135 store_net16(buf, version);
137 buf +=
sizeof(version);
138 store_net32(buf, len);
nmsg_res nmsg_container_serialize(nmsg_container_t c, uint8_t **pbuf, size_t *buf_len, bool do_header, bool do_zlib, uint32_t sequence, uint64_t sequence_id)
Serialize an NMSG container object, allocating memory as needed and returning a free()able buffer con...
#define NMSG_MAGIC
Four-octet magic sequence seen at the beginning of a serialized NMSG.
#define NMSG_HDRLSZ_V2
Number of octets in an NMSG header (magic + version + length).
void nmsg_container_set_sequence(nmsg_container_t c, bool do_sequence)
Enable or disable NMSG container sequence tracking.
#define NMSG_FLAG_ZLIB
NMSG container is zlib compressed.
void nmsg_container_destroy(nmsg_container_t *c)
Deallocate the resources associated with an nmsg_container_t object.
#define NMSG_FLAG_FRAGMENT
NMSG container is fragmented.
nmsg_container_t nmsg_container_init(size_t bufsz)
Initialize a new NMSG container object.
#define NMSG_VERSION
Current version number of the NMSG serialization format.