diff --git a/src/Makefile b/src/Makefile index 7f85353..b4a0c10 100644 --- a/src/Makefile +++ b/src/Makefile @@ -2,7 +2,7 @@ # This file is in the Public Domain. # -CFLAGS= -std=c11 -O2 -g -W -Wextra -Werror +CFLAGS= -std=c11 -g -W -Wextra -Werror CFLAGS+= -D_POSIX_C_SOURCE=200809L CFLAGS+= -D_GNU_SOURCE -D_DEFAULT_SOURCE @@ -23,7 +23,7 @@ endif ifeq ($(DEBUG),1) CFLAGS+= -O0 -DDEBUG -fno-omit-frame-pointer else -CFLAGS+= -DNDEBUG +CFLAGS+= -O2 -DNDEBUG endif LIB= libringbuf diff --git a/src/ringbuf.c b/src/ringbuf.c index e0f2c9e..a5d9893 100644 --- a/src/ringbuf.c +++ b/src/ringbuf.c @@ -75,11 +75,14 @@ #define WRAP_COUNTER (0x7fffffff00000000UL) #define WRAP_INCR(x) (((x) + 0x100000000UL) & WRAP_COUNTER) +#define WORKER_NULL (0x00000000ffffffffUL) + typedef uint64_t ringbuf_off_t; +typedef uint64_t worker_off_t; struct ringbuf_worker { volatile ringbuf_off_t seen_off; - int registered; + volatile worker_off_t next; }; struct ringbuf { @@ -94,9 +97,12 @@ struct ringbuf { volatile ringbuf_off_t next; ringbuf_off_t end; + /* Track acquires that haven't finished producing yet. */ + worker_off_t used_workers, free_workers; + /* The following are updated by the consumer. */ ringbuf_off_t written; - unsigned nworkers; + unsigned nworkers; ringbuf_worker_t workers[]; }; @@ -114,6 +120,14 @@ ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length) rbuf->space = length; rbuf->end = RBUF_OFF_MAX; rbuf->nworkers = nworkers; + + /* Put all workers into the free-stack. */ + rbuf->used_workers = rbuf->free_workers = WORKER_NULL; + for (unsigned i = 0; i < rbuf->nworkers; i++) { + rbuf->workers[i].seen_off = RBUF_OFF_MAX; + rbuf->workers[i].next = rbuf->free_workers; + rbuf->free_workers = i; + } return 0; } @@ -137,19 +151,34 @@ ringbuf_get_sizes(unsigned nworkers, ringbuf_worker_t * ringbuf_register(ringbuf_t *rbuf, unsigned i) { - ringbuf_worker_t *w = &rbuf->workers[i]; - - w->seen_off = RBUF_OFF_MAX; - atomic_thread_fence(memory_order_release); - w->registered = true; - return w; + /* Deprecated. */ + (void)rbuf; + (void)i; + return NULL; } void ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w) { - w->registered = false; + /* Deprecated. */ (void)rbuf; + (void)w; +} + +/* + * Simple xorshift; random() causes huge lock contention on Linux/glibc, + * which would "hide" the possible race conditions. + */ +__thread uint32_t fast_random_seed_r = 5381; +static unsigned long +fast_random(void) +{ + uint32_t x = fast_random_seed_r; + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + fast_random_seed_r = x; + return x; } /* @@ -169,6 +198,120 @@ stable_nextoff(ringbuf_t *rbuf) return next; } +/* + * push_worker: push this worker-record onto the given stack. + */ +static inline void +push_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_head, + ringbuf_worker_t *w) +{ + worker_off_t w_offset, old_head, new_head; + unsigned count = SPINLOCK_BACKOFF_MIN; + + /* Get the offset of the worker-record being pushed. */ + w_offset = w - rbuf->workers; + + /* Make sure this worker-record isn't on any stack already. */ + ASSERT(w->next == WORKER_NULL); + + for (;;) { + /* Get the offset of the next worker-record on the stack. */ + old_head = *stack_head; + + /* + * Prepare to push that worker-record onto the stack, + * i.e. increment the version-number of the stack-head index. + * + * Since this worker-record isn't on any stack at this point, + * nothing about its next-offset (including its version-number) + * has to be preserved. + */ + w->next = (old_head & RBUF_OFF_MASK); + new_head = (w_offset | WRAP_INCR(old_head)); + if (atomic_compare_exchange_weak(stack_head, old_head, new_head)) + break; + SPINLOCK_BACKOFF(count); + } +} + +/* + * pop_worker: pop a worker-record from the given stack. + */ +static inline ringbuf_worker_t * +pop_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_head) +{ + worker_off_t old_head, new_head; + ringbuf_worker_t *w; + unsigned count = SPINLOCK_BACKOFF_MIN; + + for (;;) { + worker_off_t old_head_offset; + + /* Get the offset of the worker-record on top of the stack. */ + old_head = *stack_head; + old_head_offset = (old_head & RBUF_OFF_MASK); + if (old_head_offset == WORKER_NULL) + return NULL; + + /* Find that worker-record. */ + w = &rbuf->workers[old_head_offset]; + + /* + * Prepare to pop that worker-record off of the stack, + * i.e. increment the version-number of the stack-head index. + */ + new_head = (w->next & RBUF_OFF_MASK); + new_head |= WRAP_INCR(old_head); + if (atomic_compare_exchange_weak(stack_head, old_head, new_head)) + break; + SPINLOCK_BACKOFF(count); + } + + /* + * Since this worker-record isn't on any stack at this point, + * nothing about its next-offset (including its version-number) + * has to be preserved. + */ + w->next = WORKER_NULL; + + return w; +} + +/* + * try_unlink_worker: try to unlink a worker-record from a stack. + */ +static inline bool +try_unlink_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_link, + worker_off_t old_link) +{ + ringbuf_worker_t *w; + worker_off_t old_link_offset, new_link; + bool success; + + /* Find that worker-record. */ + old_link_offset = (old_link & RBUF_OFF_MASK); + ASSERT (old_link_offset != WORKER_NULL); + w = &rbuf->workers[old_link_offset]; + + /* + * Prepare to unlink that worker-record from the stack, + * i.e. increment the version-number of the stack-link index. + */ + new_link = (w->next & RBUF_OFF_MASK); + new_link |= WRAP_INCR(old_link); + success = atomic_compare_exchange_weak(stack_link, old_link, new_link); + + /* + * Since this worker-record isn't on any stack at this point, + * nothing about its next-offset (including its version-number) + * has to be preserved. + */ + if (success) + w->next = WORKER_NULL; + + return success; +} + /* * ringbuf_acquire: request a space of a given length in the ring buffer. * @@ -176,12 +319,21 @@ stable_nextoff(ringbuf_t *rbuf) * => On failure: returns -1. */ ssize_t -ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) +ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len) { ringbuf_off_t seen, next, target; + ringbuf_worker_t *w; ASSERT(len > 0 && len <= rbuf->space); + + /* Get a worker-record, to track state between acquire & produce. */ + *pw = NULL; + w = pop_worker(rbuf, &rbuf->free_workers); + if (w == NULL) + return -1; ASSERT(w->seen_off == RBUF_OFF_MAX); + w->seen_off = RBUF_OFF_MAX | WRAP_LOCK_BIT; + push_worker(rbuf, &rbuf->used_workers, w); do { ringbuf_off_t written; @@ -244,6 +396,9 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len) */ w->seen_off &= ~WRAP_LOCK_BIT; + /* Hand this worker-record back to our caller. */ + *pw = w; + /* * If we set the WRAP_LOCK_BIT in the 'next' (because we exceed * the remaining space and need to wrap-around), then save the @@ -275,7 +430,6 @@ void ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w) { (void)rbuf; - ASSERT(w->registered); ASSERT(w->seen_off != RBUF_OFF_MAX); atomic_thread_fence(memory_order_release); w->seen_off = RBUF_OFF_MAX; @@ -288,6 +442,8 @@ size_t ringbuf_consume(ringbuf_t *rbuf, size_t *offset) { ringbuf_off_t written = rbuf->written, next, ready; + worker_off_t volatile *pw_link; + worker_off_t w_link, w_off; size_t towrite; retry: /* @@ -311,16 +467,14 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset) */ ready = RBUF_OFF_MAX; - for (unsigned i = 0; i < rbuf->nworkers; i++) { - ringbuf_worker_t *w = &rbuf->workers[i]; + pw_link = &rbuf->used_workers; + w_link = *pw_link; + w_off = (w_link & RBUF_OFF_MASK); + while (w_off != WORKER_NULL) { + ringbuf_worker_t *w = &rbuf->workers[w_off]; unsigned count = SPINLOCK_BACKOFF_MIN; ringbuf_off_t seen_off; - /* Skip if the worker has not registered. */ - if (!w->registered) { - continue; - } - /* * Get a stable 'seen' value. This is necessary since we * want to discard the stale 'seen' values. @@ -329,6 +483,23 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset) SPINLOCK_BACKOFF(count); } + /* If this worker has produced, clean it up. */ + if (seen_off == RBUF_OFF_MAX) { + /* + * Try to unlink this worker-record from the used-worker + * stack. + * If it can't be done, try again later. + */ + if (try_unlink_worker(rbuf, pw_link, w_link)) { + /* Free this unused worker-record. */ + w->seen_off = RBUF_OFF_MAX; + push_worker(rbuf, &rbuf->free_workers, w); + w_link = *pw_link; + w_off = (w_link & RBUF_OFF_MASK); + continue; + } + } + /* * Ignore the offsets after the possible wrap-around. * We are interested in the smallest seen offset that is @@ -338,6 +509,11 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset) ready = MIN(seen_off, ready); } ASSERT(ready >= written); + + /* Move to the next incomplete acquire/produce operation. */ + pw_link = &w->next; + w_link = *pw_link; + w_off = (w_link & RBUF_OFF_MASK); } /* diff --git a/src/ringbuf.h b/src/ringbuf.h index e8fc767..60dbb8c 100644 --- a/src/ringbuf.h +++ b/src/ringbuf.h @@ -19,7 +19,7 @@ void ringbuf_get_sizes(unsigned, size_t *, size_t *); ringbuf_worker_t *ringbuf_register(ringbuf_t *, unsigned); void ringbuf_unregister(ringbuf_t *, ringbuf_worker_t *); -ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t *, size_t); +ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t **, size_t); void ringbuf_produce(ringbuf_t *, ringbuf_worker_t *); size_t ringbuf_consume(ringbuf_t *, size_t *); void ringbuf_release(ringbuf_t *, size_t); diff --git a/src/t_ringbuf.c b/src/t_ringbuf.c index 05aa4d5..4944429 100644 --- a/src/t_ringbuf.c +++ b/src/t_ringbuf.c @@ -11,7 +11,7 @@ #include "ringbuf.h" -#define MAX_WORKERS 2 +#define MAX_WORKERS 3 static size_t ringbuf_obj_size; @@ -26,14 +26,13 @@ test_wraparound(void) /* Size n, but only (n - 1) can be produced at a time. */ ringbuf_setup(r, MAX_WORKERS, n); - w = ringbuf_register(r, 0); /* Produce (n / 2 + 1) and then attempt another (n / 2 - 1). */ - off = ringbuf_acquire(r, w, n / 2 + 1); + off = ringbuf_acquire(r, &w, n / 2 + 1); assert(off == 0); ringbuf_produce(r, w); - off = ringbuf_acquire(r, w, n / 2 - 1); + off = ringbuf_acquire(r, &w, n / 2 - 1); assert(off == -1); /* Consume (n / 2 + 1) bytes. */ @@ -42,11 +41,11 @@ test_wraparound(void) ringbuf_release(r, len); /* All consumed, attempt (n / 2 + 1) now. */ - off = ringbuf_acquire(r, w, n / 2 + 1); + off = ringbuf_acquire(r, &w, n / 2 + 1); assert(off == -1); /* However, wraparound can be successful with (n / 2). */ - off = ringbuf_acquire(r, w, n / 2); + off = ringbuf_acquire(r, &w, n / 2); assert(off == 0); ringbuf_produce(r, w); @@ -55,7 +54,6 @@ test_wraparound(void) assert(len == (n / 2) && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w); free(r); } @@ -68,21 +66,20 @@ test_multi(void) ssize_t off; ringbuf_setup(r, MAX_WORKERS, 3); - w = ringbuf_register(r, 0); /* * Produce 2 bytes. */ - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 0); ringbuf_produce(r, w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 1); ringbuf_produce(r, w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == -1); /* @@ -99,18 +96,18 @@ test_multi(void) * Produce another 2 with wrap-around. */ - off = ringbuf_acquire(r, w, 2); + off = ringbuf_acquire(r, &w, 2); assert(off == -1); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 2); ringbuf_produce(r, w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == 0); ringbuf_produce(r, w); - off = ringbuf_acquire(r, w, 1); + off = ringbuf_acquire(r, &w, 1); assert(off == -1); /* @@ -125,7 +122,6 @@ test_multi(void) assert(len == 1 && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w); free(r); } @@ -138,13 +134,11 @@ test_overlap(void) ssize_t off; ringbuf_setup(r, MAX_WORKERS, 10); - w1 = ringbuf_register(r, 0); - w2 = ringbuf_register(r, 1); /* * Producer 1: acquire 5 bytes. Consumer should fail. */ - off = ringbuf_acquire(r, w1, 5); + off = ringbuf_acquire(r, &w1, 5); assert(off == 0); len = ringbuf_consume(r, &woff); @@ -153,7 +147,7 @@ test_overlap(void) /* * Producer 2: acquire 3 bytes. Consumer should still fail. */ - off = ringbuf_acquire(r, w2, 3); + off = ringbuf_acquire(r, &w2, 3); assert(off == 5); len = ringbuf_consume(r, &woff); @@ -174,7 +168,7 @@ test_overlap(void) * Producer 1: acquire-produce 4 bytes, triggering wrap-around. * Consumer should still fail. */ - off = ringbuf_acquire(r, w1, 4); + off = ringbuf_acquire(r, &w1, 4); assert(off == 0); len = ringbuf_consume(r, &woff); @@ -197,8 +191,6 @@ test_overlap(void) assert(len == 4 && woff == 0); ringbuf_release(r, len); - ringbuf_unregister(r, w1); - ringbuf_unregister(r, w2); free(r); } @@ -212,8 +204,6 @@ test_random(void) unsigned char buf[500]; ringbuf_setup(r, MAX_WORKERS, sizeof(buf)); - w1 = ringbuf_register(r, 0); - w2 = ringbuf_register(r, 1); while (n--) { size_t len, woff; @@ -237,7 +227,7 @@ test_random(void) break; case 1: // producer 1 if (off1 == -1) { - if ((off1 = ringbuf_acquire(r, w1, len)) >= 0) { + if ((off1 = ringbuf_acquire(r, &w1, len)) >= 0) { assert((size_t)off1 < sizeof(buf)); buf[off1] = len - 1; } @@ -249,7 +239,7 @@ test_random(void) break; case 2: // producer 2 if (off2 == -1) { - if ((off2 = ringbuf_acquire(r, w2, len)) >= 0) { + if ((off2 = ringbuf_acquire(r, &w2, len)) >= 0) { assert((size_t)off2 < sizeof(buf)); buf[off2] = len - 1; } @@ -261,8 +251,6 @@ test_random(void) break; } } - ringbuf_unregister(r, w1); - ringbuf_unregister(r, w2); free(r); } diff --git a/src/t_stress.c b/src/t_stress.c index e32fe94..293cf46 100644 --- a/src/t_stress.c +++ b/src/t_stress.c @@ -102,9 +102,7 @@ ringbuf_stress(void *arg) { const unsigned id = (uintptr_t)arg; ringbuf_worker_t *w; - - w = ringbuf_register(ringbuf, id); - assert(w != NULL); + uint64_t total_recv = 0; /* * There are NCPU threads concurrently generating and producing @@ -123,6 +121,7 @@ ringbuf_stress(void *arg) if (id == 0) { if ((len = ringbuf_consume(ringbuf, &off)) != 0) { + total_recv += len; size_t rem = len; assert(off < RBUF_SIZE); while (rem) { @@ -136,7 +135,7 @@ ringbuf_stress(void *arg) continue; } len = generate_message(buf, sizeof(buf) - 1); - if ((ret = ringbuf_acquire(ringbuf, w, len)) != -1) { + if ((ret = ringbuf_acquire(ringbuf, &w, len)) != -1) { off = (size_t)ret; assert(off < RBUF_SIZE); memcpy(&rbuf[off], buf, len); @@ -144,6 +143,8 @@ ringbuf_stress(void *arg) } } pthread_barrier_wait(&barrier); + if (id == 0) + printf ("Total received: %" PRIu64 "\n", total_recv); pthread_exit(NULL); return NULL; } diff --git a/src/utils.h b/src/utils.h index d948561..b385b92 100644 --- a/src/utils.h +++ b/src/utils.h @@ -93,7 +93,7 @@ #endif #define SPINLOCK_BACKOFF(count) \ do { \ - for (int __i = (count); __i != 0; __i--) { \ + for (int __i = ((count) + fast_random() % (count)); __i != 0; __i--) { \ SPINLOCK_BACKOFF_HOOK; \ } \ if ((count) < SPINLOCK_BACKOFF_MAX) \