GCS  0.2.3
gcs_sm.h
1 /*
2  * Copyright (C) 2010-2013 Codership Oy <info@codership.com>
3  *
4  * $Id: gcs_sm.h 3473 2014-02-28 02:03:19Z alex $
5  */
6 
11 #ifndef _gcs_sm_h_
12 #define _gcs_sm_h_
13 
14 #include <galerautils.h>
15 #include <errno.h>
16 
17 #ifdef GCS_SM_CONCURRENCY
18 #define GCS_SM_CC sm->cc
19 #else
20 #define GCS_SM_CC 1
21 #endif /* GCS_SM_CONCURRENCY */
22 
23 typedef struct gcs_sm_user
24 {
25  gu_cond_t* cond;
26  bool wait;
27 }
29 
30 typedef struct gcs_sm_stats
31 {
32  long long sample_start;// beginning of the sample period
33  long long pause_start; // start of the pause
34  long long paused_ns; // total nanoseconds paused
35  long long paused_sample; // paused_ns at the beginning of the sample
36  long long send_q_samples;
37  long long send_q_len;
38 }
40 
41 typedef struct gcs_sm
42 {
43  gcs_sm_stats_t stats;
44  gu_mutex_t lock;
45 #ifdef GCS_SM_GRAB_RELEASE
46  gu_cond_t cond;
47  long cond_wait;
48 #endif /* GCS_SM_GRAB_RELEASE */
49  unsigned long wait_q_len;
50  unsigned long wait_q_mask;
51  unsigned long wait_q_head;
52  unsigned long wait_q_tail;
53  long users;
54  long entered;
55  long ret;
56 #ifdef GCS_SM_CONCURRENCY
57  long cc;
58 #endif /* GCS_SM_CONCURRENCY */
59  bool pause;
60  gcs_sm_user_t wait_q[];
61 }
62 gcs_sm_t;
63 
70 extern gcs_sm_t*
71 gcs_sm_create (long len, long n);
72 
77 extern long
78 gcs_sm_close (gcs_sm_t* sm);
79 
83 extern long
84 gcs_sm_open (gcs_sm_t* sm);
85 
89 extern void
90 gcs_sm_destroy (gcs_sm_t* sm);
91 
92 #define GCS_SM_INCREMENT(cursor) (cursor = ((cursor + 1) & sm->wait_q_mask))
93 
94 static inline void
95 _gcs_sm_wake_up_next (gcs_sm_t* sm)
96 {
97  long woken = sm->entered;
98 
99  assert (woken >= 0);
100  assert (woken <= GCS_SM_CC);
101 
102  while (woken < GCS_SM_CC && sm->users > 0) {
103  if (gu_likely(sm->wait_q[sm->wait_q_head].wait)) {
104  assert (NULL != sm->wait_q[sm->wait_q_head].cond);
105  // gu_debug ("Waking up: %lu", sm->wait_q_head);
106  gu_cond_signal (sm->wait_q[sm->wait_q_head].cond);
107  woken++;
108  }
109  else { /* skip interrupted */
110  assert (NULL == sm->wait_q[sm->wait_q_head].cond);
111  gu_debug ("Skipping interrupted: %lu", sm->wait_q_head);
112  sm->users--;
113  GCS_SM_INCREMENT(sm->wait_q_head);
114  }
115  }
116 
117  assert (woken <= GCS_SM_CC);
118  assert (sm->users >= 0);
119 }
120 
121 /* wake up whoever might be waiting there */
122 static inline void
123 _gcs_sm_wake_up_waiters (gcs_sm_t* sm)
124 {
125 #ifdef GCS_SM_GRAB_RELEASE
126  if (gu_unlikely(sm->cond_wait)) {
127  assert (sm->cond_wait > 0);
128  sm->cond_wait--;
129  gu_cond_signal (&sm->cond);
130  } else
131 #endif /* GCS_SM_GRAB_RELEASE */
132  if (!sm->pause) {
133  _gcs_sm_wake_up_next(sm);
134  }
135  else {
136  /* gcs_sm_continue() will do the rest */
137  }
138 }
139 
140 static inline void
141 _gcs_sm_leave_common (gcs_sm_t* sm)
142 {
143  assert (sm->entered < GCS_SM_CC);
144 
145  assert (sm->users > 0);
146  sm->users--;
147  assert (false == sm->wait_q[sm->wait_q_head].wait);
148  assert (NULL == sm->wait_q[sm->wait_q_head].cond);
149  GCS_SM_INCREMENT(sm->wait_q_head);
150 
151  _gcs_sm_wake_up_waiters (sm);
152 }
153 
154 static inline bool
155 _gcs_sm_enqueue_common (gcs_sm_t* sm, gu_cond_t* cond)
156 {
157  unsigned long tail = sm->wait_q_tail;
158 
159  sm->wait_q[tail].cond = cond;
160  sm->wait_q[tail].wait = true;
161  gu_cond_wait (cond, &sm->lock);
162  assert(tail == sm->wait_q_head || false == sm->wait_q[tail].wait);
163  assert(sm->wait_q[tail].cond == cond || false == sm->wait_q[tail].wait);
164  sm->wait_q[tail].cond = NULL;
165  bool ret = sm->wait_q[tail].wait;
166  sm->wait_q[tail].wait = false;
167  return ret;
168 }
169 
170 #ifdef GCS_SM_CONCURRENCY
171 #define GCS_SM_HAS_TO_WAIT \
172  (sm->users > (sm->entered + 1) || sm->entered >= GCS_SM_CC || sm->pause)
173 #else
174 #define GCS_SM_HAS_TO_WAIT (sm->users > 1 || sm->pause)
175 #endif /* GCS_SM_CONCURRENCY */
176 
185 static inline long
186 gcs_sm_schedule (gcs_sm_t* sm)
187 {
188  if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
189 
190  long ret = sm->ret;
191 
192  if (gu_likely((sm->users < (long)sm->wait_q_len) && (0 == ret))) {
193 
194  sm->users++;
195  GCS_SM_INCREMENT(sm->wait_q_tail); /* even if we don't queue, cursor
196  * needs to be advanced */
197  sm->stats.send_q_samples++;
198 
199  if (GCS_SM_HAS_TO_WAIT) {
200  ret = sm->wait_q_tail + 1; // waiter handle
201 
202  /* here we want to distinguish between FC pause and real queue */
203  sm->stats.send_q_len += sm->users - 1;
204  }
205 
206  return ret; // success
207  }
208  else if (0 == ret) {
209  assert (sm->users == (long)sm->wait_q_len);
210  ret = -EAGAIN;
211  }
212 
213  assert(ret < 0);
214 
215  gu_mutex_unlock (&sm->lock);
216 
217  return ret;
218 }
219 
231 static inline long
232 gcs_sm_enter (gcs_sm_t* sm, gu_cond_t* cond, bool scheduled)
233 {
234  long ret = 0; /* if scheduled and no queue */
235 
236  if (gu_likely (scheduled || (ret = gcs_sm_schedule(sm)) >= 0)) {
237 
238  if (GCS_SM_HAS_TO_WAIT) {
239  if (gu_likely(_gcs_sm_enqueue_common (sm, cond))) {
240  ret = sm->ret;
241  }
242  else {
243  ret = -EINTR;
244  }
245  }
246 
247  assert (ret <= 0);
248 
249  if (gu_likely(0 == ret)) {
250  assert(sm->users > 0);
251  assert(sm->entered < GCS_SM_CC);
252  sm->entered++;
253  }
254  else {
255  if (gu_likely(-EINTR == ret)) {
256  /* was interrupted, will be handled by someone else */
257  }
258  else {
259  /* monitor is closed, wake up others */
260  assert(sm->users > 0);
261  _gcs_sm_leave_common(sm);
262  }
263  }
264 
265  gu_mutex_unlock (&sm->lock);
266  }
267 
268  return ret;
269 }
270 
271 static inline void
272 gcs_sm_leave (gcs_sm_t* sm)
273 {
274  if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
275 
276  sm->entered--;
277  assert(sm->entered >= 0);
278 
279  _gcs_sm_leave_common(sm);
280 
281  gu_mutex_unlock (&sm->lock);
282 }
283 
284 static inline void
285 gcs_sm_pause (gcs_sm_t* sm)
286 {
287  if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
288 
289  /* don't pause closed monitor */
290  if (gu_likely(0 == sm->ret) && !sm->pause) {
291  sm->stats.pause_start = gu_time_monotonic();
292  sm->pause = true;
293  }
294 
295  gu_mutex_unlock (&sm->lock);
296 }
297 
298 static inline void
299 _gcs_sm_continue_common (gcs_sm_t* sm)
300 {
301  sm->pause = false;
302 
303  _gcs_sm_wake_up_next(sm); /* wake up next waiter if any */
304 }
305 
306 static inline void
307 gcs_sm_continue (gcs_sm_t* sm)
308 {
309  if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
310 
311  if (gu_likely(sm->pause)) {
312  _gcs_sm_continue_common (sm);
313 
314  sm->stats.paused_ns += gu_time_monotonic() - sm->stats.pause_start;
315  }
316  else {
317  gu_debug ("Trying to continue unpaused monitor");
318  }
319 
320  gu_mutex_unlock (&sm->lock);
321 }
322 
331 static inline long
332 gcs_sm_interrupt (gcs_sm_t* sm, long handle)
333 {
334  assert (handle > 0);
335  long ret;
336 
337  if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
338 
339  handle--;
340 
341  if (gu_likely(sm->wait_q[handle].wait)) {
342  assert (sm->wait_q[handle].cond != NULL);
343  sm->wait_q[handle].wait = false;
344  gu_cond_signal (sm->wait_q[handle].cond);
345  sm->wait_q[handle].cond = NULL;
346  ret = 0;
347  if (!sm->pause && handle == (long)sm->wait_q_head) {
348  /* gcs_sm_interrupt() was called right after the waiter was
349  * signaled by gcs_sm_continue() or gcs_sm_leave() but before
350  * the waiter has woken up. Wake up the next waiter */
351  _gcs_sm_wake_up_next(sm);
352  }
353  }
354  else {
355  ret = -ESRCH;
356  }
357 
358  gu_mutex_unlock (&sm->lock);
359 
360  return ret;
361 }
362 
373 extern void
374 gcs_sm_stats_get (gcs_sm_t* sm,
375  int* q_len,
376  double* q_len_avg,
377  long long* paused_ns,
378  double* paused_avg);
379 
381 extern void
382 gcs_sm_stats_flush(gcs_sm_t* sm);
383 
384 #ifdef GCS_SM_GRAB_RELEASE
385 
387 static inline long
388 gcs_sm_grab (gcs_sm_t* sm)
389 {
390  long ret;
391 
392  if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
393 
394  while (!(ret = sm->ret) && sm->entered >= GCS_SM_CC) {
395  sm->cond_wait++;
396  gu_cond_wait (&sm->cond, &sm->lock);
397  }
398 
399  if (ret) {
400  assert (ret < 0);
401  _gcs_sm_wake_up_waiters (sm);
402  }
403  else {
404  assert (sm->entered < GCS_SM_CC);
405  sm->entered++;
406  }
407 
408  gu_mutex_unlock (&sm->lock);
409 
410  return ret;
411 }
412 
414 static inline void
415 gcs_sm_release (gcs_sm_t* sm)
416 {
417  if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
418 
419  sm->entered--;
420  _gcs_sm_wake_up_waiters (sm);
421 
422  gu_mutex_unlock (&sm->lock);
423 }
424 #endif /* GCS_SM_GRAB_RELEASE */
425 
426 #endif /* _gcs_sm_h_ */
Definition: gcs_sm.h:30
Definition: gcs_sm.h:23
Definition: gcs_sm.h:41