14 #include <galerautils.h>
17 #ifdef GCS_SM_CONCURRENCY
18 #define GCS_SM_CC sm->cc
32 long long sample_start;
33 long long pause_start;
35 long long paused_sample;
36 long long send_q_samples;
45 #ifdef GCS_SM_GRAB_RELEASE
49 unsigned long wait_q_len;
50 unsigned long wait_q_mask;
51 unsigned long wait_q_head;
52 unsigned long wait_q_tail;
56 #ifdef GCS_SM_CONCURRENCY
71 gcs_sm_create (
long len,
long n);
92 #define GCS_SM_INCREMENT(cursor) (cursor = ((cursor + 1) & sm->wait_q_mask))
97 long woken = sm->entered;
100 assert (woken <= GCS_SM_CC);
102 while (woken < GCS_SM_CC && sm->users > 0) {
103 if (gu_likely(sm->wait_q[sm->wait_q_head].wait)) {
104 assert (NULL != sm->wait_q[sm->wait_q_head].cond);
106 gu_cond_signal (sm->wait_q[sm->wait_q_head].cond);
110 assert (NULL == sm->wait_q[sm->wait_q_head].cond);
111 gu_debug (
"Skipping interrupted: %lu", sm->wait_q_head);
113 GCS_SM_INCREMENT(sm->wait_q_head);
117 assert (woken <= GCS_SM_CC);
118 assert (sm->users >= 0);
123 _gcs_sm_wake_up_waiters (
gcs_sm_t* sm)
125 #ifdef GCS_SM_GRAB_RELEASE
126 if (gu_unlikely(sm->cond_wait)) {
127 assert (sm->cond_wait > 0);
129 gu_cond_signal (&sm->cond);
133 _gcs_sm_wake_up_next(sm);
143 assert (sm->entered < GCS_SM_CC);
145 assert (sm->users > 0);
147 assert (
false == sm->wait_q[sm->wait_q_head].wait);
148 assert (NULL == sm->wait_q[sm->wait_q_head].cond);
149 GCS_SM_INCREMENT(sm->wait_q_head);
151 _gcs_sm_wake_up_waiters (sm);
155 _gcs_sm_enqueue_common (
gcs_sm_t* sm, gu_cond_t* cond)
157 unsigned long tail = sm->wait_q_tail;
159 sm->wait_q[tail].cond = cond;
160 sm->wait_q[tail].wait =
true;
161 gu_cond_wait (cond, &sm->lock);
162 assert(tail == sm->wait_q_head ||
false == sm->wait_q[tail].wait);
163 assert(sm->wait_q[tail].cond == cond ||
false == sm->wait_q[tail].wait);
164 sm->wait_q[tail].cond = NULL;
165 bool ret = sm->wait_q[tail].wait;
166 sm->wait_q[tail].wait =
false;
170 #ifdef GCS_SM_CONCURRENCY
171 #define GCS_SM_HAS_TO_WAIT \
172 (sm->users > (sm->entered + 1) || sm->entered >= GCS_SM_CC || sm->pause)
174 #define GCS_SM_HAS_TO_WAIT (sm->users > 1 || sm->pause)
188 if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
192 if (gu_likely((sm->users < (
long)sm->wait_q_len) && (0 == ret))) {
195 GCS_SM_INCREMENT(sm->wait_q_tail);
197 sm->stats.send_q_samples++;
199 if (GCS_SM_HAS_TO_WAIT) {
200 ret = sm->wait_q_tail + 1;
203 sm->stats.send_q_len += sm->users - 1;
209 assert (sm->users == (
long)sm->wait_q_len);
215 gu_mutex_unlock (&sm->lock);
232 gcs_sm_enter (
gcs_sm_t* sm, gu_cond_t* cond,
bool scheduled)
236 if (gu_likely (scheduled || (ret = gcs_sm_schedule(sm)) >= 0)) {
238 if (GCS_SM_HAS_TO_WAIT) {
239 if (gu_likely(_gcs_sm_enqueue_common (sm, cond))) {
249 if (gu_likely(0 == ret)) {
250 assert(sm->users > 0);
251 assert(sm->entered < GCS_SM_CC);
255 if (gu_likely(-EINTR == ret)) {
260 assert(sm->users > 0);
261 _gcs_sm_leave_common(sm);
265 gu_mutex_unlock (&sm->lock);
274 if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
277 assert(sm->entered >= 0);
279 _gcs_sm_leave_common(sm);
281 gu_mutex_unlock (&sm->lock);
287 if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
290 if (gu_likely(0 == sm->ret) && !sm->pause) {
291 sm->stats.pause_start = gu_time_monotonic();
295 gu_mutex_unlock (&sm->lock);
299 _gcs_sm_continue_common (
gcs_sm_t* sm)
303 _gcs_sm_wake_up_next(sm);
309 if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
311 if (gu_likely(sm->pause)) {
312 _gcs_sm_continue_common (sm);
314 sm->stats.paused_ns += gu_time_monotonic() - sm->stats.pause_start;
317 gu_debug (
"Trying to continue unpaused monitor");
320 gu_mutex_unlock (&sm->lock);
332 gcs_sm_interrupt (
gcs_sm_t* sm,
long handle)
337 if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
341 if (gu_likely(sm->wait_q[handle].wait)) {
342 assert (sm->wait_q[handle].cond != NULL);
343 sm->wait_q[handle].wait =
false;
344 gu_cond_signal (sm->wait_q[handle].cond);
345 sm->wait_q[handle].cond = NULL;
347 if (!sm->pause && handle == (
long)sm->wait_q_head) {
351 _gcs_sm_wake_up_next(sm);
358 gu_mutex_unlock (&sm->lock);
377 long long* paused_ns,
384 #ifdef GCS_SM_GRAB_RELEASE
392 if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
394 while (!(ret = sm->ret) && sm->entered >= GCS_SM_CC) {
396 gu_cond_wait (&sm->cond, &sm->lock);
401 _gcs_sm_wake_up_waiters (sm);
404 assert (sm->entered < GCS_SM_CC);
408 gu_mutex_unlock (&sm->lock);
417 if (gu_unlikely(gu_mutex_lock (&sm->lock))) abort();
420 _gcs_sm_wake_up_waiters (sm);
422 gu_mutex_unlock (&sm->lock);