From 39adb57928ef3425dd641343b56e560329aca141 Mon Sep 17 00:00:00 2001 From: ulatekh Date: Sun, 28 Jan 2018 10:49:22 -0700 Subject: [PATCH 1/5] Now workers are tracked in used/free stacks, and ringbuf_consume() only iterates through in-use workers. Previously, in-use workers were tracked with a "registered" flag in ringbuf_worker_t, and ringbuf_consume() iterated through all possible workers. This could be inefficient if there were far more potential workers than actual workers. Presently, ringbuf_worker_t.next is a pointer. If the ABA Problem is considered to be important, it can be changed to an offset into ringbuf_t.workers[] and a counter. --- src/ringbuf.c | 99 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 28 deletions(-) diff --git a/src/ringbuf.c b/src/ringbuf.c index e0f2c9e..081449d 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -79,7 +79,7 @@ typedef uint64_t ringbuf_off_t; struct ringbuf_worker { volatile ringbuf_off_t seen_off; - int registered; + ringbuf_worker_t * next; }; struct ringbuf { @@ -94,6 +94,9 @@ struct ringbuf { volatile ringbuf_off_t next; ringbuf_off_t end; + /* Track acquires that haven't finished producing yet. */ + ringbuf_worker_t *used_workers, *free_workers; + /* The following are updated by the consumer. */ ringbuf_off_t written; unsigned nworkers; @@ -114,6 +117,12 @@ ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length) rbuf->space = length; rbuf->end = RBUF_OFF_MAX; rbuf->nworkers = nworkers; + + /* Put all workers into the free-stack. */ + for (unsigned i = 0; i < rbuf->nworkers; i++) { + rbuf->workers[i].next = rbuf->free_workers; + rbuf->free_workers = &(rbuf->workers[i]); + } return 0; } @@ -137,18 +146,32 @@ ringbuf_get_sizes(unsigned nworkers, ringbuf_worker_t * ringbuf_register(ringbuf_t *rbuf, unsigned i) { - ringbuf_worker_t *w = &rbuf->workers[i]; + /* Get a worker-record, to track state between acquire & produce. */ + ringbuf_worker_t *w, *new_free, *old_used; + do { + w = rbuf->free_workers; + if (!w) + return NULL; + new_free = w->next; + } while (!atomic_compare_exchange_weak(&rbuf->free_workers, w, new_free)); + /* No acquire/produce being tracked yet. */ w->seen_off = RBUF_OFF_MAX; - atomic_thread_fence(memory_order_release); - w->registered = true; + + /* Push this worker on top of the used-worker stack. */ + do { + old_used = rbuf->used_workers; + w->next = old_used; + } while (!atomic_compare_exchange_weak(&rbuf->used_workers, old_used, w)); + return w; } void ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w) { - w->registered = false; + /* Mark this worker as unregistered. */ + w->seen_off = (RBUF_OFF_MAX | WRAP_LOCK_BIT); (void)rbuf; } @@ -275,8 +298,7 @@ void ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w) { (void)rbuf; - ASSERT(w->registered); - ASSERT(w->seen_off != RBUF_OFF_MAX); + ASSERT((w->seen_off & ~WRAP_LOCK_BIT) != RBUF_OFF_MAX); atomic_thread_fence(memory_order_release); w->seen_off = RBUF_OFF_MAX; } @@ -288,6 +310,8 @@ size_t ringbuf_consume(ringbuf_t *rbuf, size_t *offset) { ringbuf_off_t written = rbuf->written, next, ready; + ringbuf_worker_t *w; + ringbuf_worker_t * volatile *pw; size_t towrite; retry: /* @@ -311,33 +335,52 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset) */ ready = RBUF_OFF_MAX; - for (unsigned i = 0; i < rbuf->nworkers; i++) { - ringbuf_worker_t *w = &rbuf->workers[i]; + w = rbuf->used_workers; + pw = &rbuf->used_workers; + while (w) { unsigned count = SPINLOCK_BACKOFF_MIN; ringbuf_off_t seen_off; - /* Skip if the worker has not registered. */ - if (!w->registered) { - continue; - } + /* If this worker has been unregistered, try to clean it up. */ + if (w->seen_off == (RBUF_OFF_MAX | WRAP_LOCK_BIT)) { + ringbuf_worker_t *old_free; - /* - * Get a stable 'seen' value. This is necessary since we - * want to discard the stale 'seen' values. - */ - while ((seen_off = w->seen_off) & WRAP_LOCK_BIT) { - SPINLOCK_BACKOFF(count); - } + /* + * Remove this worker from the used-worker stack. + * If it can't be, try again later. + */ + if (atomic_compare_exchange_weak(&pw, w, w->next)) { + /* Push this unused worker on top of the free-worker stack. */ + do { + old_free = rbuf->free_workers; + w->next = old_free; + } while (!atomic_compare_exchange_weak(&rbuf->free_workers, old_free, w)); + w = w->next; + continue; + } + } else { + /* + * Get a stable 'seen' value. This is necessary since we + * want to discard the stale 'seen' values. + */ + while ((seen_off = w->seen_off) & WRAP_LOCK_BIT) { + SPINLOCK_BACKOFF(count); + } - /* - * Ignore the offsets after the possible wrap-around. - * We are interested in the smallest seen offset that is - * not behind the 'written' offset. - */ - if (seen_off >= written) { - ready = MIN(seen_off, ready); + /* + * Ignore the offsets after the possible wrap-around. + * We are interested in the smallest seen offset that is + * not behind the 'written' offset. + */ + if (seen_off >= written) { + ready = MIN(seen_off, ready); + } + ASSERT(ready >= written); } - ASSERT(ready >= written); + + /* Move to the next incomplete acquire/produce operation. */ + pw = &w->next; + w = w->next; } /* From 93ef3995f036e206ed426485e5ae0b1e5223a336 Mon Sep 17 00:00:00 2001 From: ulatekh Date: Sun, 28 Jan 2018 16:24:51 -0700 Subject: [PATCH 2/5] Fixed clang-detected build errors. --- src/ringbuf.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ringbuf.c b/src/ringbuf.c index 081449d..cff19ae 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -164,6 +164,7 @@ ringbuf_register(ringbuf_t *rbuf, unsigned i) w->next = old_used; } while (!atomic_compare_exchange_weak(&rbuf->used_workers, old_used, w)); + (void)i; return w; } @@ -349,7 +350,7 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset) * Remove this worker from the used-worker stack. * If it can't be, try again later. */ - if (atomic_compare_exchange_weak(&pw, w, w->next)) { + if (atomic_compare_exchange_weak(pw, w, w->next)) { /* Push this unused worker on top of the free-worker stack. */ do { old_free = rbuf->free_workers; From dbb7cfc771d152ed41e42b5aed59bb097421c2d5 Mon Sep 17 00:00:00 2001 From: ulatekh Date: Tue, 6 Feb 2018 20:24:56 -0700 Subject: [PATCH 3/5] Rewrote the worker-stack to allocate a worker-record in ringbuf_acquire(). This not only decreases the amount of time that a worker-record remains on the used-stack (i.e. to help make it more crash resistant), but it removes the need to register workers. It passes the tests, but not the stress-test. --- src/Makefile | 4 +- src/ringbuf.c | 246 +++++++++++++++++++++++++++++++++++------------- src/ringbuf.h | 2 +- src/t_ringbuf.c | 44 ++++----- src/t_stress.c | 5 +- 5 files changed, 199 insertions(+), 102 deletions(-) diff --git a/src/Makefile b/src/Makefile index 7f85353..b4a0c10 100644 --- a/src/Makefile +++ b/src/Makefile @@ -2,7 +2,7 @@ # This file is in the Public Domain. # -CFLAGS= -std=c11 -O2 -g -W -Wextra -Werror +CFLAGS= -std=c11 -g -W -Wextra -Werror CFLAGS+= -D_POSIX_C_SOURCE=200809L CFLAGS+= -D_GNU_SOURCE -D_DEFAULT_SOURCE @@ -23,7 +23,7 @@ endif ifeq ($(DEBUG),1) CFLAGS+= -O0 -DDEBUG -fno-omit-frame-pointer else -CFLAGS+= -DNDEBUG +CFLAGS+= -O2 -DNDEBUG endif LIB= libringbuf diff --git a/src/ringbuf.c b/src/ringbuf.c index cff19ae..45d5315 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -75,11 +75,14 @@ #define WRAP_COUNTER (0x7fffffff00000000UL) #define WRAP_INCR(x) (((x) + 0x100000000UL) & WRAP_COUNTER) +#define WORKER_NULL (0x00000000ffffffffUL) + typedef uint64_t ringbuf_off_t; +typedef uint64_t worker_off_t; struct ringbuf_worker { volatile ringbuf_off_t seen_off; - ringbuf_worker_t * next; + volatile worker_off_t next; }; struct ringbuf { @@ -95,11 +98,11 @@ struct ringbuf { ringbuf_off_t end; /* Track acquires that haven't finished producing yet. */ - ringbuf_worker_t *used_workers, *free_workers; + worker_off_t used_workers, free_workers; /* The following are updated by the consumer. */ ringbuf_off_t written; - unsigned nworkers; + unsigned nworkers; ringbuf_worker_t workers[]; }; @@ -119,9 +122,11 @@ ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length) rbuf->nworkers = nworkers; /* Put all workers into the free-stack. */ + rbuf->used_workers = rbuf->free_workers = WORKER_NULL; for (unsigned i = 0; i < rbuf->nworkers; i++) { + rbuf->workers[i].seen_off = RBUF_OFF_MAX; rbuf->workers[i].next = rbuf->free_workers; - rbuf->free_workers = &(rbuf->workers[i]); + rbuf->free_workers = i; } return 0; } @@ -146,34 +151,18 @@ ringbuf_get_sizes(unsigned nworkers, ringbuf_worker_t * ringbuf_register(ringbuf_t *rbuf, unsigned i) { - /* Get a worker-record, to track state between acquire & produce. */ - ringbuf_worker_t *w, *new_free, *old_used; - do { - w = rbuf->free_workers; - if (!w) - return NULL; - new_free = w->next; - } while (!atomic_compare_exchange_weak(&rbuf->free_workers, w, new_free)); - - /* No acquire/produce being tracked yet. */ - w->seen_off = RBUF_OFF_MAX; - - /* Push this worker on top of the used-worker stack. */ - do { - old_used = rbuf->used_workers; - w->next = old_used; - } while (!atomic_compare_exchange_weak(&rbuf->used_workers, old_used, w)); - + /* Deprecated. */ + (void)rbuf; (void)i; - return w; + return NULL; } void ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w) { - /* Mark this worker as unregistered. */ - w->seen_off = (RBUF_OFF_MAX | WRAP_LOCK_BIT); + /* Deprecated. */ (void)rbuf; + (void)w; } /* @@ -193,6 +182,112 @@ stable_nextoff(ringbuf_t *rbuf) return next; } +/* + * push_worker: push this worker-record onto the given stack. + */ +static inline void +push_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_head, + ringbuf_worker_t *w) +{ + worker_off_t w_offset, old_head, new_head; + + /* Get the offset of the worker-record being pushed. */ + w_offset = w - rbuf->workers; + + /* Make sure this worker-record isn't on any stack already. */ + ASSERT(w->next == WORKER_NULL); + + do { + /* Get the offset of the next worker-record on the stack. */ + old_head = *stack_head; + + /* + * Prepare to push that worker-record onto the stack, + * i.e. increment the version-number of the stack-head index. + * + * Since this worker-record isn't on any stack at this point, + * nothing about its next-offset (including its version-number) + * has to be preserved. + */ + w->next = (old_head & RBUF_OFF_MASK); + new_head = (w_offset | WRAP_INCR(old_head)); + } while (!atomic_compare_exchange_weak(stack_head, old_head, new_head)); +} + +/* + * pop_worker: pop a worker-record from the given stack. + */ +static inline ringbuf_worker_t * +pop_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_head) +{ + worker_off_t old_head, new_head; + ringbuf_worker_t *w; + + do { + worker_off_t old_head_offset; + + /* Get the offset of the worker-record on top of the stack. */ + old_head = *stack_head; + old_head_offset = (old_head & RBUF_OFF_MASK); + if (old_head_offset == WORKER_NULL) + return NULL; + + /* Find that worker-record. */ + w = &rbuf->workers[old_head_offset]; + + /* + * Prepare to pop that worker-record off of the stack, + * i.e. increment the version-number of the stack-head index. + */ + new_head = (w->next & RBUF_OFF_MASK); + new_head |= WRAP_INCR(old_head); + } while (!atomic_compare_exchange_weak(stack_head, old_head, new_head)); + + /* + * Since this worker-record isn't on any stack at this point, + * nothing about its next-offset (including its version-number) + * has to be preserved. + */ + w->next = WORKER_NULL; + + return w; +} + +/* + * try_unlink_worker: try to unlink a worker-record from a stack. + */ +static inline bool +try_unlink_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_link, + worker_off_t old_link) +{ + ringbuf_worker_t *w; + worker_off_t old_link_offset, new_link; + bool success; + + /* Find that worker-record. */ + old_link_offset = (old_link & RBUF_OFF_MASK); + ASSERT (old_link_offset != WORKER_NULL); + w = &rbuf->workers[old_link_offset]; + + /* + * Prepare to unlink that worker-record from the stack, + * i.e. increment the version-number of the stack-link index. + */ + new_link = (w->next & RBUF_OFF_MASK); + new_link |= WRAP_INCR(old_link); + success = atomic_compare_exchange_weak(stack_link, old_link, new_link); + + /* + * Since this worker-record isn't on any stack at this point, + * nothing about its next-offset (including its version-number) + * has to be preserved. + */ + if (success) + w->next = WORKER_NULL; + + return success; +} + /* * ringbuf_acquire: request a space of a given length in the ring buffer. * @@ -200,11 +295,19 @@ stable_nextoff(ringbuf_t *rbuf) * => On failure: returns -1. */ ssize_t -ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) +ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) { ringbuf_off_t seen, next, target; + ringbuf_worker_t *w; + ringbuf_off_t seen_off; ASSERT(len > 0 && len <= rbuf->space); + + /* Get a worker-record, to track state between acquire & produce. */ + *pw = NULL; + w = pop_worker(rbuf, &rbuf->free_workers); + if (w == NULL) + return -1; ASSERT(w->seen_off == RBUF_OFF_MAX); do { @@ -222,7 +325,7 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) seen = stable_nextoff(rbuf); next = seen & RBUF_OFF_MASK; ASSERT(next < rbuf->space); - w->seen_off = next | WRAP_LOCK_BIT; + seen_off = next | WRAP_LOCK_BIT; /* * Compute the target offset. Key invariant: we cannot @@ -231,8 +334,10 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) target = next + len; written = rbuf->written; if (__predict_false(next < written && target >= written)) { + /* Free this unused worker-record. */ + push_worker(rbuf, &rbuf->free_workers, w); + /* The producer must wait. */ - w->seen_off = RBUF_OFF_MAX; return -1; } @@ -251,7 +356,9 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) */ target = exceed ? (WRAP_LOCK_BIT | len) : 0; if ((target & RBUF_OFF_MASK) >= written) { - w->seen_off = RBUF_OFF_MAX; + /* Free this unused worker-record. */ + push_worker(rbuf, &rbuf->free_workers, w); + return -1; } /* Increment the wrap-around counter. */ @@ -266,7 +373,11 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) * Acquired the range. Clear WRAP_LOCK_BIT in the 'seen' value * thus indicating that it is stable now. */ - w->seen_off &= ~WRAP_LOCK_BIT; + w->seen_off = (seen_off & ~WRAP_LOCK_BIT); + push_worker(rbuf, &rbuf->used_workers, w); + + /* Hand this worker-record back to our caller. */ + *pw = w; /* * If we set the WRAP_LOCK_BIT in the 'next' (because we exceed @@ -299,7 +410,7 @@ void ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w) { (void)rbuf; - ASSERT((w->seen_off & ~WRAP_LOCK_BIT) != RBUF_OFF_MAX); + ASSERT(w->seen_off != RBUF_OFF_MAX); atomic_thread_fence(memory_order_release); w->seen_off = RBUF_OFF_MAX; } @@ -311,8 +422,8 @@ size_t ringbuf_consume(ringbuf_t *rbuf, size_t *offset) { ringbuf_off_t written = rbuf->written, next, ready; - ringbuf_worker_t *w; - ringbuf_worker_t * volatile *pw; + worker_off_t volatile *pw_link; + worker_off_t w_link, w_off; size_t towrite; retry: /* @@ -336,52 +447,53 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset) */ ready = RBUF_OFF_MAX; - w = rbuf->used_workers; - pw = &rbuf->used_workers; - while (w) { + pw_link = &rbuf->used_workers; + w_link = *pw_link; + w_off = (w_link & RBUF_OFF_MASK); + while (w_off != WORKER_NULL) { + ringbuf_worker_t *w = &rbuf->workers[w_off]; unsigned count = SPINLOCK_BACKOFF_MIN; ringbuf_off_t seen_off; - /* If this worker has been unregistered, try to clean it up. */ - if (w->seen_off == (RBUF_OFF_MAX | WRAP_LOCK_BIT)) { - ringbuf_worker_t *old_free; + /* + * Get a stable 'seen' value. This is necessary since we + * want to discard the stale 'seen' values. + */ + while ((seen_off = w->seen_off) & WRAP_LOCK_BIT) { + SPINLOCK_BACKOFF(count); + } + /* If this worker has produced, clean it up. */ + if (seen_off == RBUF_OFF_MAX) { /* - * Remove this worker from the used-worker stack. - * If it can't be, try again later. + * Try to unlink this worker-record from the used-worker + * stack. + * If it can't be done, try again later. */ - if (atomic_compare_exchange_weak(pw, w, w->next)) { - /* Push this unused worker on top of the free-worker stack. */ - do { - old_free = rbuf->free_workers; - w->next = old_free; - } while (!atomic_compare_exchange_weak(&rbuf->free_workers, old_free, w)); - w = w->next; + if (try_unlink_worker(rbuf, pw_link, w_link)) { + /* Free this unused worker-record. */ + w->seen_off = RBUF_OFF_MAX; + push_worker(rbuf, &rbuf->free_workers, w); + w_link = *pw_link; + w_off = (w_link & RBUF_OFF_MASK); continue; } - } else { - /* - * Get a stable 'seen' value. This is necessary since we - * want to discard the stale 'seen' values. - */ - while ((seen_off = w->seen_off) & WRAP_LOCK_BIT) { - SPINLOCK_BACKOFF(count); - } + } - /* - * Ignore the offsets after the possible wrap-around. - * We are interested in the smallest seen offset that is - * not behind the 'written' offset. - */ - if (seen_off >= written) { - ready = MIN(seen_off, ready); - } - ASSERT(ready >= written); + /* + * Ignore the offsets after the possible wrap-around. + * We are interested in the smallest seen offset that is + * not behind the 'written' offset. + */ + if (seen_off >= written) { + ready = MIN(seen_off, ready); } + ASSERT(ready >= written); /* Move to the next incomplete acquire/produce operation. */ - pw = &w->next; - w = w->next; + pw_link = &w->next; + w_link = *pw_link; + w_off = (w_link & RBUF_OFF_MASK); } /* diff --git a/src/ringbuf.h b/src/ringbuf.h index e8fc767..60dbb8c 100644 --- a/src/ringbuf.h +++ b/src/ringbuf.h @@ -19,7 +19,7 @@ void ringbuf_get_sizes(unsigned, size_t *, size_t *); ringbuf_worker_t *ringbuf_register(ringbuf_t *, unsigned); void ringbuf_unregister(ringbuf_t *, ringbuf_worker_t *); -ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t *, size_t); +ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t **, size_t); void ringbuf_produce(ringbuf_t *, ringbuf_worker_t *); size_t ringbuf_consume(ringbuf_t *, size_t *); void ringbuf_release(ringbuf_t *, size_t); diff --git a/src/t_ringbuf.c b/src/t_ringbuf.c index 05aa4d5..0e46ab2 100644 --- a/src/t_ringbuf.c +++ b/src/t_ringbuf.c @@ -26,14 +26,13 @@ test_wraparound(void) /* Size n, but only (n - 1) can be produced at a time. */ ringbuf_setup(r, MAX_WORKERS, n); - w = ringbuf_register(r, 0); /* Produce (n / 2 + 1) and then attempt another (n / 2 - 1). */ - off = ringbuf_acquire(r, w, n / 2 + 1); + off = ringbuf_acquire(r, &w, n / 2 + 1); assert(off == 0); ringbuf_produce(r, w); - off = ringbuf_acquire(r, w, n / 2 - 1); + off = ringbuf_acquire(r, &w, n / 2 - 1); assert(off == -1); /* Consume (n / 2 + 1) bytes. */ @@ -42,11 +41,11 @@ test_wraparound(void) ringbuf_release(r, len); /* All consumed, attempt (n / 2 + 1) now. */ - off = ringbuf_acquire(r, w, n / 2 + 1); + off = ringbuf_acquire(r, &w, n / 2 + 1); assert(off == -1); /* However, wraparound can be successful with (n / 2). */ - off = ringbuf_acquire(r, w, n / 2); + off = ringbuf_acquire(r, &w, n / 2); assert(off == 0); ringbuf_produce(r, w); @@ -55,7 +54,6 @@ test_wraparound(void) assert(len == (n / 2) && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w); free(r); } @@ -68,21 +66,20 @@ test_multi(void) ssize_t off; ringbuf_setup(r, MAX_WORKERS, 3); - w = ringbuf_register(r, 0); /* * Produce 2 bytes. */ - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 0); ringbuf_produce(r, w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 1); ringbuf_produce(r, w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == -1); /* @@ -99,18 +96,18 @@ test_multi(void) * Produce another 2 with wrap-around. */ - off = ringbuf_acquire(r, w, 2); + off = ringbuf_acquire(r, &w, 2); assert(off == -1); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 2); ringbuf_produce(r, w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 0); ringbuf_produce(r, w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == -1); /* @@ -125,7 +122,6 @@ test_multi(void) assert(len == 1 && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w); free(r); } @@ -138,13 +134,11 @@ test_overlap(void) ssize_t off; ringbuf_setup(r, MAX_WORKERS, 10); - w1 = ringbuf_register(r, 0); - w2 = ringbuf_register(r, 1); /* * Producer 1: acquire 5 bytes. Consumer should fail. */ - off = ringbuf_acquire(r, w1, 5); + off = ringbuf_acquire(r, &w1, 5); assert(off == 0); len = ringbuf_consume(r, &woff); @@ -153,7 +147,7 @@ test_overlap(void) /* * Producer 2: acquire 3 bytes. Consumer should still fail. */ - off = ringbuf_acquire(r, w2, 3); + off = ringbuf_acquire(r, &w2, 3); assert(off == 5); len = ringbuf_consume(r, &woff); @@ -174,7 +168,7 @@ test_overlap(void) * Producer 1: acquire-produce 4 bytes, triggering wrap-around. * Consumer should still fail. */ - off = ringbuf_acquire(r, w1, 4); + off = ringbuf_acquire(r, &w1, 4); assert(off == 0); len = ringbuf_consume(r, &woff); @@ -197,8 +191,6 @@ test_overlap(void) assert(len == 4 && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w1); - ringbuf_unregister(r, w2); free(r); } @@ -212,8 +204,6 @@ test_random(void) unsigned char buf[500]; ringbuf_setup(r, MAX_WORKERS, sizeof(buf)); - w1 = ringbuf_register(r, 0); - w2 = ringbuf_register(r, 1); while (n--) { size_t len, woff; @@ -237,7 +227,7 @@ test_random(void) break; case 1: // producer 1 if (off1 == -1) { - if ((off1 = ringbuf_acquire(r, w1, len)) >= 0) { + if ((off1 = ringbuf_acquire(r, &w1, len)) >= 0) { assert((size_t)off1 < sizeof(buf)); buf[off1] = len - 1; } @@ -249,7 +239,7 @@ test_random(void) break; case 2: // producer 2 if (off2 == -1) { - if ((off2 = ringbuf_acquire(r, w2, len)) >= 0) { + if ((off2 = ringbuf_acquire(r, &w2, len)) >= 0) { assert((size_t)off2 < sizeof(buf)); buf[off2] = len - 1; } @@ -261,8 +251,6 @@ test_random(void) break; } } - ringbuf_unregister(r, w1); - ringbuf_unregister(r, w2); free(r); } diff --git a/src/t_stress.c b/src/t_stress.c index e32fe94..1cc75a7 100644 --- a/src/t_stress.c +++ b/src/t_stress.c @@ -103,9 +103,6 @@ ringbuf_stress(void *arg) const unsigned id = (uintptr_t)arg; ringbuf_worker_t *w; - w = ringbuf_register(ringbuf, id); - assert(w != NULL); - /* * There are NCPU threads concurrently generating and producing * random messages and a single consumer thread (ID 0) verifying @@ -136,7 +133,7 @@ ringbuf_stress(void *arg) continue; } len = generate_message(buf, sizeof(buf) - 1); - if ((ret = ringbuf_acquire(ringbuf, w, len)) != -1) { + if ((ret = ringbuf_acquire(ringbuf, &w, len)) != -1) { off = (size_t)ret; assert(off < RBUF_SIZE); memcpy(&rbuf[off], buf, len); From 43f46765857433d9d83467a0efa92ebc0900bee7 Mon Sep 17 00:00:00 2001 From: ulatekh Date: Wed, 7 Feb 2018 04:12:33 -0700 Subject: [PATCH 4/5] Now it passes the tests and the stress-test. The downside is that failed calls to ringbuf_acquire() consume a worker-record, and ringbuf_consume() must be called to free up worker-records on the used-workers stack that are no longer in use. I'm pretty sure it's not safe to try to remove used-workers from the stack except in ringbuf_consume(). This probably isn't a problem in practice, but I'm still not happy about it. --- src/ringbuf.c | 16 ++++++---------- src/t_ringbuf.c | 2 +- src/t_stress.c | 4 ++++ 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/ringbuf.c b/src/ringbuf.c index 45d5315..c52f4bb 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -299,7 +299,6 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) { ringbuf_off_t seen, next, target; ringbuf_worker_t *w; - ringbuf_off_t seen_off; ASSERT(len > 0 && len <= rbuf->space); @@ -309,6 +308,8 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) if (w == NULL) return -1; ASSERT(w->seen_off == RBUF_OFF_MAX); + w->seen_off = RBUF_OFF_MAX | WRAP_LOCK_BIT; + push_worker(rbuf, &rbuf->used_workers, w); do { ringbuf_off_t written; @@ -325,7 +326,7 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) seen = stable_nextoff(rbuf); next = seen & RBUF_OFF_MASK; ASSERT(next < rbuf->space); - seen_off = next | WRAP_LOCK_BIT; + w->seen_off = next | WRAP_LOCK_BIT; /* * Compute the target offset. Key invariant: we cannot @@ -334,10 +335,8 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) target = next + len; written = rbuf->written; if (__predict_false(next < written && target >= written)) { - /* Free this unused worker-record. */ - push_worker(rbuf, &rbuf->free_workers, w); - /* The producer must wait. */ + w->seen_off = RBUF_OFF_MAX; return -1; } @@ -356,9 +355,7 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) */ target = exceed ? (WRAP_LOCK_BIT | len) : 0; if ((target & RBUF_OFF_MASK) >= written) { - /* Free this unused worker-record. */ - push_worker(rbuf, &rbuf->free_workers, w); - + w->seen_off = RBUF_OFF_MAX; return -1; } /* Increment the wrap-around counter. */ @@ -373,8 +370,7 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) * Acquired the range. Clear WRAP_LOCK_BIT in the 'seen' value * thus indicating that it is stable now. */ - w->seen_off = (seen_off & ~WRAP_LOCK_BIT); - push_worker(rbuf, &rbuf->used_workers, w); + w->seen_off &= ~WRAP_LOCK_BIT; /* Hand this worker-record back to our caller. */ *pw = w; diff --git a/src/t_ringbuf.c b/src/t_ringbuf.c index 0e46ab2..4944429 100644 --- a/src/t_ringbuf.c +++ b/src/t_ringbuf.c @@ -11,7 +11,7 @@ #include "ringbuf.h" -#define MAX_WORKERS 2 +#define MAX_WORKERS 3 static size_t ringbuf_obj_size; diff --git a/src/t_stress.c b/src/t_stress.c index 1cc75a7..293cf46 100644 --- a/src/t_stress.c +++ b/src/t_stress.c @@ -102,6 +102,7 @@ ringbuf_stress(void *arg) { const unsigned id = (uintptr_t)arg; ringbuf_worker_t *w; + uint64_t total_recv = 0; /* * There are NCPU threads concurrently generating and producing @@ -120,6 +121,7 @@ ringbuf_stress(void *arg) if (id == 0) { if ((len = ringbuf_consume(ringbuf, &off)) != 0) { + total_recv += len; size_t rem = len; assert(off < RBUF_SIZE); while (rem) { @@ -141,6 +143,8 @@ ringbuf_stress(void *arg) } } pthread_barrier_wait(&barrier); + if (id == 0) + printf ("Total received: %" PRIu64 "\n", total_recv); pthread_exit(NULL); return NULL; } From 3bcf4cc6bd10d7b8eaf13b6f2692fe0d982d07d4 Mon Sep 17 00:00:00 2001 From: ulatekh Date: Wed, 7 Feb 2018 06:10:05 -0700 Subject: [PATCH 5/5] Added spinlock-backoff to the worker-record stack routines. It helps, but stress-test performance is still far below the old version of the code, as well as being inconsistent. --- src/ringbuf.c | 32 ++++++++++++++++++++++++++++---- src/utils.h | 2 +- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/ringbuf.c b/src/ringbuf.c index c52f4bb..a5d9893 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -165,6 +165,22 @@ ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w) (void)w; } +/* + * Simple xorshift; random() causes huge lock contention on Linux/glibc, + * which would "hide" the possible race conditions. + */ +__thread uint32_t fast_random_seed_r = 5381; +static unsigned long +fast_random(void) +{ + uint32_t x = fast_random_seed_r; + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + fast_random_seed_r = x; + return x; +} + /* * stable_nextoff: capture and return a stable value of the 'next' offset. */ @@ -190,6 +206,7 @@ push_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_head, ringbuf_worker_t *w) { worker_off_t w_offset, old_head, new_head; + unsigned count = SPINLOCK_BACKOFF_MIN; /* Get the offset of the worker-record being pushed. */ w_offset = w - rbuf->workers; @@ -197,7 +214,7 @@ push_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_head, /* Make sure this worker-record isn't on any stack already. */ ASSERT(w->next == WORKER_NULL); - do { + for (;;) { /* Get the offset of the next worker-record on the stack. */ old_head = *stack_head; @@ -211,7 +228,10 @@ push_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_head, */ w->next = (old_head & RBUF_OFF_MASK); new_head = (w_offset | WRAP_INCR(old_head)); - } while (!atomic_compare_exchange_weak(stack_head, old_head, new_head)); + if (atomic_compare_exchange_weak(stack_head, old_head, new_head)) + break; + SPINLOCK_BACKOFF(count); + } } /* @@ -222,8 +242,9 @@ pop_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_head) { worker_off_t old_head, new_head; ringbuf_worker_t *w; + unsigned count = SPINLOCK_BACKOFF_MIN; - do { + for (;;) { worker_off_t old_head_offset; /* Get the offset of the worker-record on top of the stack. */ @@ -241,7 +262,10 @@ pop_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_head) */ new_head = (w->next & RBUF_OFF_MASK); new_head |= WRAP_INCR(old_head); - } while (!atomic_compare_exchange_weak(stack_head, old_head, new_head)); + if (atomic_compare_exchange_weak(stack_head, old_head, new_head)) + break; + SPINLOCK_BACKOFF(count); + } /* * Since this worker-record isn't on any stack at this point, diff --git a/src/utils.h b/src/utils.h index d948561..b385b92 100644 --- a/src/utils.h +++ b/src/utils.h @@ -93,7 +93,7 @@ #endif #define SPINLOCK_BACKOFF(count) \ do { \ - for (int __i = (count); __i != 0; __i--) { \ + for (int __i = ((count) + fast_random() % (count)); __i != 0; __i--) { \ SPINLOCK_BACKOFF_HOOK; \ } \ if ((count) < SPINLOCK_BACKOFF_MAX) \