nmsg  0.9.0
brate.c
1 /*
2  * Copyright (c) 2012 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 
21 /* Macros. */
22 
23 #define QUANTUM 0.1
24 
25 /* Data structures. */
26 
27 struct nmsg_brate {
28  size_t qu;
29  size_t target_byte_rate;
30  double t_time;
31  struct timespec container_zero_time;
32  struct timespec container_last_time;
33 };
34 
35 /* Export. */
36 
37 struct nmsg_brate *
38 _nmsg_brate_init(size_t target_byte_rate) {
39  struct nmsg_brate *b;
40 
41  b = calloc(1, sizeof(*b));
42  if (b == NULL)
43  return (NULL);
44  b->target_byte_rate = target_byte_rate;
45  nmsg_timespec_get(&b->container_last_time);
46 
47  return (b);
48 }
49 
50 void
51 _nmsg_brate_destroy(struct nmsg_brate **b) {
52  if (*b != NULL) {
53  free(*b);
54  *b = NULL;
55  }
56 }
57 
58 void
59 _nmsg_brate_sleep(struct nmsg_brate *b,
60  size_t container_sz,
61  size_t n_payloads,
62  size_t n)
63 {
64  struct timespec now;
65 
66  /* get current time */
67  nmsg_timespec_get(&now);
68 
69  if (n == 0) {
70  /* new container */
71  double c_time;
72  struct timespec container_time = now;
73 
74  /* calculate c_time, the overhead taken by the deserialization
75  * of the container. if this is the very first container
76  * processed since _nmsg_brate_init() was called, this value
77  * will also include some additional startup overhead.
78  */
79  nmsg_timespec_sub(&b->container_last_time, &container_time);
80  c_time = nmsg_timespec_to_double(&container_time);
81 
82  /* calculate t_time, the theoretical amount of time that should
83  * be spent on this container to maintain the target byte rate.
84  * since container sizes can vary especially if this is the last
85  * container in a file, this is a per-container value.
86  */
87  b->t_time = (container_sz + 0.0) / (b->target_byte_rate);
88 
89  /* since c_time was already consumed in overhead, subtract it
90  * from t_time, if it is smaller.
91  */
92  if (c_time < b->t_time)
93  b->t_time -= c_time;
94 
95  /* calculate qu, the payload quantum. */
96  b->qu = QUANTUM * n_payloads;
97  if (b->qu < 1)
98  b->qu = 1;
99 
100  /* store the container zero time, which will be used to
101  * calculate the elapsed time every qu payloads.
102  */
103  b->container_zero_time = now;
104  } else if ((n % b->qu) == 0 || n == n_payloads - 1) {
105  double frac;
106  double e_time;
107  double s_time;
108  double t_time_frac;
109  struct timespec ts;
110 
111  /* calculate frac, the fraction of the container that has been
112  * processed so far. note that this is only an estimate, since
113  * it is assumed that payloads are of average size, on average.
114  */
115  frac = (n + 0.0) / n_payloads;
116 
117  /* calculate e_time, the time elapsed from the start of this
118  * container to the current payload.
119  */
120  ts = now;
121  nmsg_timespec_sub(&b->container_zero_time, &ts);
122  e_time = nmsg_timespec_to_double(&ts);
123 
124  /* calculate t_time_frac, the part of t_time that should be
125  * consumed in order to maintain the target byte rate.
126  */
127  t_time_frac = frac * b->t_time;
128 
129  /* calculate s_time, the amount of time to sleep (if positive),
130  * in order to maintain the target byte rate, by subtracting the
131  * elapsed time since the start of the container from
132  * t_time_frac.
133  */
134  s_time = t_time_frac - e_time;
135 
136  /* do the actual sleep, if possible. */
137  if (s_time > 0.0) {
138  nmsg_timespec_from_double(s_time, &ts);
139  nmsg_timespec_sleep(&ts);
140  }
141 
142  if (n == n_payloads - 1) {
143  /* this is the last payload of the container. save the
144  * current time so that the overhead taken by
145  * deserialization of the next container can be
146  * accounted for.
147  */
148  nmsg_timespec_get(&b->container_last_time);
149  }
150  }
151 }
void nmsg_timespec_get(struct timespec *ts)
Get the current time.
Definition: timespec.c:24
void nmsg_timespec_from_double(double seconds, struct timespec *ts)
Convert floating point number of seconds to timespec.
Definition: timespec.c:69
double nmsg_timespec_to_double(const struct timespec *ts)
Convert timespec to floating point representation.
Definition: timespec.c:64
void nmsg_timespec_sleep(const struct timespec *ts)
Sleep.
Definition: timespec.c:36
void nmsg_timespec_sub(const struct timespec *a, struct timespec *b)
Subtract timespec b from a, placing result in b.
Definition: timespec.c:54