23 #define IDFMT "%016" PRIx64
27 static void reset_seqsrc(
struct nmsg_seqsrc *,
const char *);
32 _input_seqsrc_destroy(nmsg_input_t input) {
35 seqsrc = ISC_LIST_HEAD(input->stream->seqsrcs);
36 while (seqsrc != NULL) {
37 _nmsg_dprintf(5,
"%s: source id= " IDFMT
": "
38 "count=%" PRIu64
" dropped=%" PRIu64
" (%.4f)\n",
40 seqsrc->key.sequence_id,
41 seqsrc->count, seqsrc->count_dropped,
42 (seqsrc->count_dropped) /
43 (seqsrc->count_dropped + seqsrc->count + 1.0)
45 seqsrc_next = ISC_LIST_NEXT(seqsrc, link);
50 if (_nmsg_global_debug >= 4 && input->stream->count_recv > 0) {
51 double frac = (input->stream->count_drop + 0.0) /
52 (input->stream->count_recv + input->stream->count_drop + 0.0);
54 "%s: input=%p count_recv=%" PRIu64
" count_drop=%" PRIu64
" (%.4f)\n",
57 input->stream->count_recv,
58 input->stream->count_drop,
65 _input_seqsrc_update(nmsg_input_t input,
struct nmsg_seqsrc *seqsrc, Nmsg__Nmsg *nmsg) {
71 nmsg->has_sequence_id))
76 if (seqsrc->sequence_id != nmsg->sequence_id) {
77 seqsrc->sequence_id = nmsg->sequence_id;
79 reset_seqsrc(seqsrc,
"sequence id mismatch");
86 if (seqsrc->sequence != nmsg->sequence) {
87 int64_t delta = ((int64_t)(nmsg->sequence)) -
88 ((int64_t)(seqsrc->sequence));
99 if (delta > 1048576) {
102 reset_seqsrc(seqsrc,
"implausibly large delta");
108 seqsrc->count_dropped += delta;
111 "%s: source id= " IDFMT
": expected sequence (%u) != "
112 "wire sequence (%u), delta %" PRIu64
", drop fraction %.4f\n",
114 seqsrc->key.sequence_id,
118 (seqsrc->count_dropped) /
119 (seqsrc->count_dropped + seqsrc->count + 1.0)
123 seqsrc->init =
false;
124 seqsrc->sequence = nmsg->sequence + 1;
129 _input_seqsrc_get(nmsg_input_t input, Nmsg__Nmsg *nmsg) {
131 struct sockaddr_in *sai;
132 struct sockaddr_in6 *sai6;
133 struct sockaddr_storage *addr_ss = &input->stream->addr_ss;
135 seqsrc = ISC_LIST_HEAD(input->stream->seqsrcs);
136 while (seqsrc != NULL) {
137 seqsrc_next = ISC_LIST_NEXT(seqsrc, link);
139 if (nmsg->sequence_id == seqsrc->key.sequence_id &&
140 addr_ss->ss_family == AF_INET &&
141 seqsrc->key.af == AF_INET)
143 sai = (
struct sockaddr_in *) addr_ss;
144 if (sai->sin_port == seqsrc->key.port &&
145 memcmp(&sai->sin_addr.s_addr, seqsrc->key.ip4, 4) == 0)
149 }
else if (nmsg->sequence_id == seqsrc->key.sequence_id &&
150 addr_ss->ss_family == AF_INET6 &&
151 seqsrc->key.af == AF_INET6)
153 sai6 = (
struct sockaddr_in6 *) addr_ss;
154 if (sai6->sin6_port == seqsrc->key.port &&
155 memcmp(sai6->sin6_addr.s6_addr, seqsrc->key.ip6, 16) == 0)
159 }
else if (nmsg->sequence_id == seqsrc->key.sequence_id &&
160 seqsrc->key.af == AF_UNSPEC)
165 if (seqsrc->last < input->stream->now.tv_sec - NMSG_SEQSRC_GC_INTERVAL) {
167 "%s: freeing old source id= " IDFMT
": "
168 "count= %" PRIu64
" count_dropped= %" PRIu64
"\n",
169 __func__, seqsrc->key.sequence_id,
170 seqsrc->count, seqsrc->count_dropped
173 ISC_LIST_UNLINK(input->stream->seqsrcs, seqsrc, link);
177 seqsrc = seqsrc_next;
180 if (seqsrc == NULL) {
181 seqsrc = calloc(1,
sizeof(*seqsrc));
182 assert(seqsrc != NULL);
185 if (input->stream->type == nmsg_stream_type_sock) {
186 seqsrc->key.sequence_id = nmsg->sequence_id;
187 seqsrc->key.af = addr_ss->ss_family;
188 if (seqsrc->key.af == AF_INET) {
189 sai = (
struct sockaddr_in *) addr_ss;
190 seqsrc->key.port = sai->sin_port;
191 memcpy(seqsrc->key.ip4, &sai->sin_addr.s_addr, 4);
195 sizeof(seqsrc->addr_str));
196 }
else if (seqsrc->key.af == AF_INET6) {
197 sai6 = (
struct sockaddr_in6 *) addr_ss;
198 seqsrc->key.port = sai6->sin6_port;
199 memcpy(seqsrc->key.ip6, sai6->sin6_addr.s6_addr, 16);
203 sizeof(seqsrc->addr_str));
206 }
else if (input->stream->type == nmsg_stream_type_xs) {
207 seqsrc->key.sequence_id = nmsg->sequence_id;
208 seqsrc->key.af = AF_UNSPEC;
214 ISC_LINK_INIT(seqsrc, link);
215 ISC_LIST_APPEND(input->stream->seqsrcs, seqsrc, link);
216 _nmsg_dprintf(5,
"%s: initialized new seqsrc id= " IDFMT
"\n",
217 __func__, seqsrc->key.sequence_id);
219 if (seqsrc != ISC_LIST_HEAD(input->stream->seqsrcs)) {
220 ISC_LIST_UNLINK(input->stream->seqsrcs, seqsrc, link);
221 ISC_LIST_PREPEND(input->stream->seqsrcs, seqsrc, link);
225 seqsrc->last = input->stream->now.tv_sec;
232 reset_seqsrc(
struct nmsg_seqsrc *seqsrc,
const char *why) {
234 "%s: resetting source id= " IDFMT
": %s: "
235 "count= %" PRIu64
" count_dropped= %" PRIu64
"\n",
237 seqsrc->key.sequence_id,
240 seqsrc->count_dropped
242 seqsrc->sequence = 0;
244 seqsrc->count_dropped = 0;