nmsg  0.9.0
input_seqsrc.c
1 /*
2  * Copyright (c) 2011-2013 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 /* Macros. */
22 
23 #define IDFMT "%016" PRIx64
24 
25 /* Forward. */
26 
27 static void reset_seqsrc(struct nmsg_seqsrc *, const char *);
28 
29 /* Internal functions. */
30 
31 void
32 _input_seqsrc_destroy(nmsg_input_t input) {
33  struct nmsg_seqsrc *seqsrc, *seqsrc_next;
34 
35  seqsrc = ISC_LIST_HEAD(input->stream->seqsrcs);
36  while (seqsrc != NULL) {
37  _nmsg_dprintf(5, "%s: source id= " IDFMT ": "
38  "count=%" PRIu64 " dropped=%" PRIu64 " (%.4f)\n",
39  __func__,
40  seqsrc->key.sequence_id,
41  seqsrc->count, seqsrc->count_dropped,
42  (seqsrc->count_dropped) /
43  (seqsrc->count_dropped + seqsrc->count + 1.0)
44  );
45  seqsrc_next = ISC_LIST_NEXT(seqsrc, link);
46  free(seqsrc);
47  seqsrc = seqsrc_next;
48  }
49 
50  if (_nmsg_global_debug >= 4 && input->stream->count_recv > 0) {
51  double frac = (input->stream->count_drop + 0.0) /
52  (input->stream->count_recv + input->stream->count_drop + 0.0);
53  _nmsg_dprintf(4,
54  "%s: input=%p count_recv=%" PRIu64 " count_drop=%" PRIu64 " (%.4f)\n",
55  __func__,
56  input,
57  input->stream->count_recv,
58  input->stream->count_drop,
59  frac
60  );
61  }
62 }
63 
64 size_t
65 _input_seqsrc_update(nmsg_input_t input, struct nmsg_seqsrc *seqsrc, Nmsg__Nmsg *nmsg) {
66  size_t drop = 0;
67 
68  if (!(input->type == nmsg_input_type_stream &&
69  nmsg != NULL &&
70  nmsg->has_sequence &&
71  nmsg->has_sequence_id))
72  {
73  return (drop);
74  }
75 
76  if (seqsrc->sequence_id != nmsg->sequence_id) {
77  seqsrc->sequence_id = nmsg->sequence_id;
78  if (!seqsrc->init) {
79  reset_seqsrc(seqsrc, "sequence id mismatch");
80  seqsrc->init = true;
81  }
82  }
83 
84  seqsrc->count += 1;
85 
86  if (seqsrc->sequence != nmsg->sequence) {
87  int64_t delta = ((int64_t)(nmsg->sequence)) -
88  ((int64_t)(seqsrc->sequence));
89  delta %= 4294967296;
90  if (delta < 0)
91  delta += 4294967296;
92 
93  if (seqsrc->init) {
94  /* don't count the delta as a drop, since the seqsrc
95  * has just been initialized */
96  goto out;
97  }
98 
99  if (delta > 1048576) {
100  /* don't count the delta as a drop, since the delta
101  * is implausibly large */
102  reset_seqsrc(seqsrc, "implausibly large delta");
103  goto out;
104  }
105 
106  /* count the delta as a drop */
107  drop = delta;
108  seqsrc->count_dropped += delta;
109 
110  _nmsg_dprintf(5,
111  "%s: source id= " IDFMT ": expected sequence (%u) != "
112  "wire sequence (%u), delta %" PRIu64 ", drop fraction %.4f\n",
113  __func__,
114  seqsrc->key.sequence_id,
115  seqsrc->sequence,
116  nmsg->sequence,
117  delta,
118  (seqsrc->count_dropped) /
119  (seqsrc->count_dropped + seqsrc->count + 1.0)
120  );
121  }
122 out:
123  seqsrc->init = false;
124  seqsrc->sequence = nmsg->sequence + 1;
125  return (drop);
126 }
127 
128 struct nmsg_seqsrc *
129 _input_seqsrc_get(nmsg_input_t input, Nmsg__Nmsg *nmsg) {
130  struct nmsg_seqsrc *seqsrc, *seqsrc_next;
131  struct sockaddr_in *sai;
132  struct sockaddr_in6 *sai6;
133  struct sockaddr_storage *addr_ss = &input->stream->addr_ss;
134 
135  seqsrc = ISC_LIST_HEAD(input->stream->seqsrcs);
136  while (seqsrc != NULL) {
137  seqsrc_next = ISC_LIST_NEXT(seqsrc, link);
138 
139  if (nmsg->sequence_id == seqsrc->key.sequence_id &&
140  addr_ss->ss_family == AF_INET &&
141  seqsrc->key.af == AF_INET)
142  {
143  sai = (struct sockaddr_in *) addr_ss;
144  if (sai->sin_port == seqsrc->key.port &&
145  memcmp(&sai->sin_addr.s_addr, seqsrc->key.ip4, 4) == 0)
146  {
147  break;
148  }
149  } else if (nmsg->sequence_id == seqsrc->key.sequence_id &&
150  addr_ss->ss_family == AF_INET6 &&
151  seqsrc->key.af == AF_INET6)
152  {
153  sai6 = (struct sockaddr_in6 *) addr_ss;
154  if (sai6->sin6_port == seqsrc->key.port &&
155  memcmp(sai6->sin6_addr.s6_addr, seqsrc->key.ip6, 16) == 0)
156  {
157  break;
158  }
159  } else if (nmsg->sequence_id == seqsrc->key.sequence_id &&
160  seqsrc->key.af == AF_UNSPEC)
161  {
162  break;
163  }
164 
165  if (seqsrc->last < input->stream->now.tv_sec - NMSG_SEQSRC_GC_INTERVAL) {
166  _nmsg_dprintf(5,
167  "%s: freeing old source id= " IDFMT ": "
168  "count= %" PRIu64 " count_dropped= %" PRIu64 "\n",
169  __func__, seqsrc->key.sequence_id,
170  seqsrc->count, seqsrc->count_dropped
171  );
172 
173  ISC_LIST_UNLINK(input->stream->seqsrcs, seqsrc, link);
174  free(seqsrc);
175  }
176 
177  seqsrc = seqsrc_next;
178  }
179 
180  if (seqsrc == NULL) {
181  seqsrc = calloc(1, sizeof(*seqsrc));
182  assert(seqsrc != NULL);
183  seqsrc->init = true;
184 
185  if (input->stream->type == nmsg_stream_type_sock) {
186  seqsrc->key.sequence_id = nmsg->sequence_id;
187  seqsrc->key.af = addr_ss->ss_family;
188  if (seqsrc->key.af == AF_INET) {
189  sai = (struct sockaddr_in *) addr_ss;
190  seqsrc->key.port = sai->sin_port;
191  memcpy(seqsrc->key.ip4, &sai->sin_addr.s_addr, 4);
192  inet_ntop(AF_INET,
193  seqsrc->key.ip4,
194  seqsrc->addr_str,
195  sizeof(seqsrc->addr_str));
196  } else if (seqsrc->key.af == AF_INET6) {
197  sai6 = (struct sockaddr_in6 *) addr_ss;
198  seqsrc->key.port = sai6->sin6_port;
199  memcpy(seqsrc->key.ip6, sai6->sin6_addr.s6_addr, 16);
200  inet_ntop(AF_INET6,
201  seqsrc->key.ip6,
202  seqsrc->addr_str,
203  sizeof(seqsrc->addr_str));
204  }
205 #ifdef HAVE_LIBXS
206  } else if (input->stream->type == nmsg_stream_type_xs) {
207  seqsrc->key.sequence_id = nmsg->sequence_id;
208  seqsrc->key.af = AF_UNSPEC;
209  }
210 #else /* HAVE_LIBXS */
211  }
212 #endif /* HAVE_LIBXS */
213 
214  ISC_LINK_INIT(seqsrc, link);
215  ISC_LIST_APPEND(input->stream->seqsrcs, seqsrc, link);
216  _nmsg_dprintf(5, "%s: initialized new seqsrc id= " IDFMT "\n",
217  __func__, seqsrc->key.sequence_id);
218  } else {
219  if (seqsrc != ISC_LIST_HEAD(input->stream->seqsrcs)) {
220  ISC_LIST_UNLINK(input->stream->seqsrcs, seqsrc, link);
221  ISC_LIST_PREPEND(input->stream->seqsrcs, seqsrc, link);
222  }
223  }
224 
225  seqsrc->last = input->stream->now.tv_sec;
226  return (seqsrc);
227 }
228 
229 /* Private functions. */
230 
231 static void
232 reset_seqsrc(struct nmsg_seqsrc *seqsrc, const char *why) {
233  _nmsg_dprintf(5,
234  "%s: resetting source id= " IDFMT ": %s: "
235  "count= %" PRIu64 " count_dropped= %" PRIu64 "\n",
236  __func__,
237  seqsrc->key.sequence_id,
238  why,
239  seqsrc->count,
240  seqsrc->count_dropped
241  );
242  seqsrc->sequence = 0;
243  seqsrc->count = 0;
244  seqsrc->count_dropped = 0;
245 }
NMSG payloads from file or socket.
Definition: input.h:57