GComm  0.2.3
evs_proto.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2009 Codership Oy <info@codership.com>
3  */
4 
11 #ifndef GCOMM_EVS_PROTO_HPP
12 #define GCOMM_EVS_PROTO_HPP
13 
14 #include "gcomm/protolay.hpp"
15 #include "gcomm/view.hpp"
16 #include "gcomm/transport.hpp"
17 #include "gcomm/map.hpp"
18 #include "histogram.hpp"
19 #include "profile.hpp"
20 
21 #include "evs_seqno.hpp"
22 #include "evs_node.hpp"
23 #include "evs_consensus.hpp"
24 
25 #include "gu_datetime.hpp"
26 
27 #include <list>
28 #include <deque>
29 #include <vector>
30 #include <limits>
31 
32 #ifndef GCOMM_EVS_MAX_VERSION
33 #define GCOMM_EVS_MAX_VERSION 0
34 #endif // GCOMM_EVS_MAX_VERSION
35 
36 namespace gcomm
37 {
38  namespace evs
39  {
40 
41  class Message;
42  class MessageNodeList;
43  class UserMessage;
44  class DelegateMessage;
45  class GapMessage;
46  class JoinMessage;
47  class InstallMessage;
48  class LeaveMessage;
49  class InputMap;
50  class InputMapMsg;
51  class Proto;
52  std::ostream& operator<<(std::ostream&, const Proto&);
53  }
54 }
55 
56 
61 {
62 public:
63  enum State {
64  S_CLOSED,
65  S_JOINING,
66  S_LEAVING,
67  S_GATHER,
68  S_INSTALL,
69  S_OPERATIONAL,
70  S_MAX
71  };
72 
73  static std::string to_string(const State s)
74  {
75  switch (s) {
76  case S_CLOSED: return "CLOSED";
77  case S_JOINING: return "JOINING";
78  case S_LEAVING: return "LEAVING";
79  case S_GATHER: return "GATHER";
80  case S_INSTALL: return "INSTALL";
81  case S_OPERATIONAL: return "OPERATIONAL";
82  default:
83  gu_throw_fatal << "Invalid state";
84  }
85  }
86 
87  friend std::ostream& operator<<(std::ostream&, const Proto&);
88 
92  Proto(gu::Config& conf,
93  const UUID& my_uuid,
94  SegmentId segment,
95  const gu::URI& uri = gu::URI("evs://"),
96  const size_t mtu = std::numeric_limits<size_t>::max());
97  ~Proto();
98 
99  const UUID& uuid() const { return my_uuid_; }
100 
101  std::string self_string() const
102  {
103  std::ostringstream os;
104  os << "evs::proto(" << uuid() << ", " << to_string(state())
105  << ", " << current_view_.id() << ")";
106  return os.str();
107  }
108 
109  State state() const { return state_; }
110 
111  size_t known_size() const { return known_.size(); }
112 
113  bool is_output_empty() const { return output_.empty(); }
114 
115  std::string stats() const;
116  void reset_stats();
117 
118  bool is_flow_control(const seqno_t, const seqno_t win) const;
119  int send_user(Datagram&,
120  uint8_t,
121  Order,
122  seqno_t,
123  seqno_t,
124  size_t n_aggregated = 1);
125  size_t mtu() const { return mtu_; }
126  size_t aggregate_len() const;
127  int send_user(const seqno_t);
128  void complete_user(const seqno_t);
129  int send_delegate(Datagram&);
130  void send_gap(const UUID&, const ViewId&, const Range, bool commit = false);
131  const JoinMessage& create_join();
132  void send_join(bool tval = true);
133  void set_join(const JoinMessage&, const UUID&);
134  void set_leave(const LeaveMessage&, const UUID&);
135  void send_leave(bool handle = true);
136  void send_install();
137 
138  void resend(const UUID&, const Range);
139  void recover(const UUID&, const UUID&, const Range);
140 
141  void retrans_user(const UUID&, const MessageNodeList&);
142  void retrans_leaves(const MessageNodeList&);
143 
144  void set_inactive(const UUID&);
145  void check_inactive();
146  // Clean up foreign nodes according to install message.
147  void cleanup_foreign(const InstallMessage&);
148  void cleanup_views();
149  void cleanup_joins();
150 
151  size_t n_operational() const;
152 
153  void validate_reg_msg(const UserMessage&);
154  void deliver_finish(const InputMapMsg&);
155  void deliver();
156  void deliver_local(bool trans = false);
157  void deliver_causal(uint8_t user_type, seqno_t seqno, const Datagram&);
158  void validate_trans_msg(const UserMessage&);
159  void deliver_trans();
160  void deliver_reg_view(const InstallMessage&, const View&);
161  void deliver_trans_view(const InstallMessage&, const View&);
162  void deliver_empty_view();
163 
164  void setall_committed(bool val);
165  bool is_all_committed() const;
166  void setall_installed(bool val);
167  bool is_all_installed() const;
168 
169 
170  bool is_representative(const UUID& pid) const;
171 
172  void shift_to(const State, const bool send_j = true);
173 
174 
175  // Message handlers
176 private:
177 
178 
185  seqno_t update_im_safe_seq(const size_t uuid, const seqno_t seq);
186 
191  bool update_im_safe_seqs(const MessageNodeList&);
192  bool is_msg_from_previous_view(const Message&);
193  void check_suspects(const UUID&, const MessageNodeList&);
194  void cross_check_inactives(const UUID&, const MessageNodeList&);
195  void check_unseen();
196  void check_nil_view_id();
197  void handle_foreign(const Message&);
198  void handle_user(const UserMessage&,
199  NodeMap::iterator,
200  const Datagram&);
201  void handle_delegate(const DelegateMessage&,
202  NodeMap::iterator,
203  const Datagram&);
204  void handle_gap(const GapMessage&, NodeMap::iterator);
205  void handle_join(const JoinMessage&, NodeMap::iterator);
206  void handle_leave(const LeaveMessage&, NodeMap::iterator);
207  void handle_install(const InstallMessage&, NodeMap::iterator);
208  void populate_node_list(MessageNodeList*) const;
209 public:
210  static size_t unserialize_message(const UUID&,
211  const Datagram&,
212  Message*);
213  void handle_msg(const Message& msg,
214  const Datagram& dg = Datagram());
215  // Protolay
216  void handle_up(const void*, const Datagram&, const ProtoUpMeta&);
217  int handle_down(Datagram& wb, const ProtoDownMeta& dm);
218  void handle_stable_view(const View& view)
219  {
220  set_stable_view(view);
221  }
222  void connect(bool first)
223  {
224  gu_trace(shift_to(S_JOINING));
225  gu_trace(send_join(first));
226  }
227 
228  void close(bool force = false)
229  {
230  // shifting to S_LEAVING from S_INSTALL is troublesome,
231  // instead of that raise a boolean flag to indicate that
232  // shifting to S_LEAVING should be done once S_OPERATIONAL
233  // is reached
234  //
235  // #760 - pending leave should be done also from S_GATHER,
236  // changing state to S_LEAVING resets timers and may prevent
237  // remaining nodes to reach new group until install timer
238  // times out
239  log_debug << self_string() << " closing in state " << state();
240  if (state() != S_GATHER && state() != S_INSTALL)
241  {
242  gu_trace(shift_to(S_LEAVING));
243  gu_trace(send_leave());
244  pending_leave_ = false;
245  }
246  else
247  {
248  pending_leave_ = true;
249  }
250  }
251 
252  void close(const UUID& uuid)
253  {
254  set_inactive(uuid);
255  }
256 
257  bool set_param(const std::string& key, const std::string& val);
258 
259  // gu::datetime::Date functions do appropriate actions for timer handling
260  // and return next expiration time
261 private:
262 public:
263  enum Timer
264  {
265  T_INACTIVITY,
266  T_RETRANS,
267  T_INSTALL,
268  T_STATS
269  };
273  class TimerList :
274  public MultiMap<gu::datetime::Date, Timer> { };
275 private:
276  TimerList timers_;
277 public:
278  // These need currently to be public for unit tests
279  void handle_inactivity_timer();
280  void handle_retrans_timer();
281  void handle_install_timer();
282  void handle_stats_timer();
283  gu::datetime::Date next_expiration(const Timer) const;
284  void reset_timers();
285  gu::datetime::Date handle_timers();
286 
292  {
293  D_STATE = 1 << 0,
294  D_TIMERS = 1 << 1,
295  D_CONSENSUS = 1 << 2,
296  D_USER_MSGS = 1 << 3,
297  D_DELEGATE_MSGS = 1 << 4,
298  D_GAP_MSGS = 1 << 5,
299  D_JOIN_MSGS = 1 << 6,
300  D_INSTALL_MSGS = 1 << 7,
301  D_LEAVE_MSGS = 1 << 8,
302  D_FOREIGN_MSGS = 1 << 9,
303  D_RETRANS = 1 << 10,
304  D_DELIVERY = 1 << 11
305  };
306 
311  {
312  I_VIEWS = 1 << 0,
313  I_STATE = 1 << 1,
314  I_STATISTICS = 1 << 2,
315  I_PROFILING = 1 << 3
316  };
317 private:
318 
319  int version_;
320  static const int max_version_ = GCOMM_EVS_MAX_VERSION;
321  int debug_mask_;
322  int info_mask_;
323  gu::datetime::Date last_stats_report_;
324  bool collect_stats_;
325  Histogram hs_agreed_;
326  Histogram hs_safe_;
327  Histogram hs_local_causal_;
328  long long int send_queue_s_;
329  long long int n_send_queue_s_;
330  std::vector<long long int> sent_msgs_;
331  long long int retrans_msgs_;
332  long long int recovered_msgs_;
333  std::vector<long long int> recvd_msgs_;
334  std::vector<long long int> delivered_msgs_;
335  prof::Profile send_user_prof_;
336  prof::Profile send_gap_prof_;
337  prof::Profile send_join_prof_;
338  prof::Profile send_install_prof_;
339  prof::Profile send_leave_prof_;
340  prof::Profile consistent_prof_;
341  prof::Profile consensus_prof_;
342  prof::Profile shift_to_prof_;
343  prof::Profile input_map_prof_;
344  prof::Profile delivery_prof_;
345  bool delivering_;
346  UUID my_uuid_;
347  SegmentId segment_;
348  //
349  // Known instances
350  NodeMap known_;
351  NodeMap::iterator self_i_;
352  //
353  gu::datetime::Period view_forget_timeout_;
354  gu::datetime::Period inactive_timeout_;
355  gu::datetime::Period suspect_timeout_;
356  gu::datetime::Period inactive_check_period_;
357  gu::datetime::Period retrans_period_;
358  gu::datetime::Period install_timeout_;
359  gu::datetime::Period join_retrans_period_;
360  gu::datetime::Period stats_report_period_;
361  gu::datetime::Period causal_keepalive_period_;
362 
363  gu::datetime::Date last_inactive_check_;
364  gu::datetime::Date last_causal_keepalive_;
365 
366  // Current view id
367  // ViewId current_view;
368  View current_view_;
369  View previous_view_;
370  std::list<std::pair<ViewId, gu::datetime::Date> > previous_views_;
371 
372  // Map containing received messages and aru/safe seqnos
373  InputMap* input_map_;
374  // Helper container for local causal messages
375  class CausalMessage
376  {
377  public:
378  CausalMessage(uint8_t user_type,
379  seqno_t seqno,
380  const Datagram& datagram)
381  :
382  user_type_(user_type),
383  seqno_ (seqno ),
384  datagram_ (datagram ),
385  tstamp_ (gu::datetime::Date::now())
386  { }
387  uint8_t user_type() const { return user_type_; }
388  seqno_t seqno() const { return seqno_ ; }
389  const Datagram& datagram() const { return datagram_ ; }
390  const gu::datetime::Date& tstamp() const { return tstamp_ ; }
391  private:
392  uint8_t user_type_;
393  seqno_t seqno_;
394  Datagram datagram_;
395  gu::datetime::Date tstamp_;
396  };
397  // Queue containing local causal messages
398  std::deque<CausalMessage> causal_queue_;
399  // Consensus module
400  Consensus consensus_;
401  // Last received install message
402  InstallMessage* install_message_;
403  // Install attempt counter
404  uint32_t attempt_seq_;
405  // Install timeout counting
406  int max_install_timeouts_;
407  int install_timeout_count_;
408  // Sequence number to maintain membership message FIFO order
409  int64_t fifo_seq_;
410  // Last sent seq
411  seqno_t last_sent_;
412  // Protocol send window size
413  seqno_t send_window_;
414  // User send window size
415  seqno_t user_send_window_;
416  // Output message queue
417  std::deque<std::pair<Datagram, ProtoDownMeta> > output_;
418  std::vector<gu::byte_t> send_buf_;
419  uint32_t max_output_size_;
420  size_t mtu_;
421  bool use_aggregate_;
422  bool self_loopback_;
423  State state_;
424  int shift_to_rfcnt_;
425  bool pending_leave_;
426 
427  // non-copyable
428  Proto(const Proto&);
429  void operator=(const Proto&);
430 };
431 
432 
433 #endif // EVS_PROTO_HPP
Definition: view.hpp:29
Definition: profile.hpp:136
Definition: evs_proto.hpp:294
Definition: evs_proto.hpp:303
Definition: evs_proto.hpp:296
Definition: evs_seqno.hpp:31
Definition: evs_message2.hpp:520
Definition: evs_message2.hpp:465
Class implementing EVS protocol.
Definition: evs_proto.hpp:60
Definition: evs_proto.hpp:314
Definition: evs_proto.hpp:301
Definition: evs_message2.hpp:552
Definition: evs_proto.hpp:300
Definition: evs_input_map2.hpp:72
Definition: evs_proto.hpp:304
Definition: map.hpp:244
Lightweight profiling utility.
Definition: evs_proto.hpp:293
DebugFlags
Flags controlling what debug information is logged if debug logging is turned on. ...
Definition: evs_proto.hpp:291
Definition: evs_proto.hpp:298
Definition: evs_message2.hpp:585
Definition: evs_proto.hpp:302
Definition: evs_proto.hpp:295
Definition: view.hpp:119
Definition: histogram.hpp:14
Definition: evs_node.hpp:137
Definition: evs_message2.hpp:390
Definition: evs_proto.hpp:315
Definition: evs_proto.hpp:312
Definition: evs_proto.hpp:299
InfoFlags
Flags controlling what info log is printed in logs.
Definition: evs_proto.hpp:310
Definition: protolay.hpp:192
Definition: evs_message2.hpp:487
Definition: evs_proto.hpp:273
Protocol layer interface definitions.
Definition: evs_proto.hpp:313
Definition: evs_message2.hpp:113
Definition: protolay.hpp:77
Definition: evs_message2.hpp:121
Transport interface.
Definition: evs_proto.hpp:297
Definition: uuid.hpp:26
Definition: evs_input_map2.hpp:148
Proto(gu::Config &conf, const UUID &my_uuid, SegmentId segment, const gu::URI &uri=gu::URI("evs://"), const size_t mtu=std::numeric_limits< size_t >::max())
Definition: protolay.hpp:168
Datagram container.
Definition: datagram.hpp:151