Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is in the Public Domain.
#

CFLAGS= -std=c11 -O2 -g -W -Wextra -Werror
CFLAGS= -std=c11 -g -W -Wextra -Werror
CFLAGS+= -D_POSIX_C_SOURCE=200809L
CFLAGS+= -D_GNU_SOURCE -D_DEFAULT_SOURCE

Expand All @@ -23,7 +23,7 @@ endif
ifeq ($(DEBUG),1)
CFLAGS+= -O0 -DDEBUG -fno-omit-frame-pointer
else
CFLAGS+= -DNDEBUG
CFLAGS+= -O2 -DNDEBUG
endif

LIB= libringbuf
Expand Down
212 changes: 194 additions & 18 deletions src/ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,14 @@
#define WRAP_COUNTER (0x7fffffff00000000UL)
#define WRAP_INCR(x) (((x) + 0x100000000UL) & WRAP_COUNTER)

#define WORKER_NULL (0x00000000ffffffffUL)

typedef uint64_t ringbuf_off_t;
typedef uint64_t worker_off_t;

struct ringbuf_worker {
volatile ringbuf_off_t seen_off;
int registered;
volatile worker_off_t next;
};

struct ringbuf {
Expand All @@ -94,9 +97,12 @@ struct ringbuf {
volatile ringbuf_off_t next;
ringbuf_off_t end;

/* Track acquires that haven't finished producing yet. */
worker_off_t used_workers, free_workers;

/* The following are updated by the consumer. */
ringbuf_off_t written;
unsigned nworkers;
unsigned nworkers;
ringbuf_worker_t workers[];
};

Expand All @@ -114,6 +120,14 @@ ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length)
rbuf->space = length;
rbuf->end = RBUF_OFF_MAX;
rbuf->nworkers = nworkers;

/* Put all workers into the free-stack. */
rbuf->used_workers = rbuf->free_workers = WORKER_NULL;
for (unsigned i = 0; i < rbuf->nworkers; i++) {
rbuf->workers[i].seen_off = RBUF_OFF_MAX;
rbuf->workers[i].next = rbuf->free_workers;
rbuf->free_workers = i;
}
return 0;
}

Expand All @@ -137,19 +151,34 @@ ringbuf_get_sizes(unsigned nworkers,
ringbuf_worker_t *
ringbuf_register(ringbuf_t *rbuf, unsigned i)
{
ringbuf_worker_t *w = &rbuf->workers[i];

w->seen_off = RBUF_OFF_MAX;
atomic_thread_fence(memory_order_release);
w->registered = true;
return w;
/* Deprecated. */
(void)rbuf;
(void)i;
return NULL;
}

void
ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w)
{
w->registered = false;
/* Deprecated. */
(void)rbuf;
(void)w;
}

/*
* Simple xorshift; random() causes huge lock contention on Linux/glibc,
* which would "hide" the possible race conditions.
*/
__thread uint32_t fast_random_seed_r = 5381;
static unsigned long
fast_random(void)
{
uint32_t x = fast_random_seed_r;
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
fast_random_seed_r = x;
return x;
}

/*
Expand All @@ -169,19 +198,142 @@ stable_nextoff(ringbuf_t *rbuf)
return next;
}

/*
* push_worker: push this worker-record onto the given stack.
*/
static inline void
push_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_head,
ringbuf_worker_t *w)
{
worker_off_t w_offset, old_head, new_head;
unsigned count = SPINLOCK_BACKOFF_MIN;

/* Get the offset of the worker-record being pushed. */
w_offset = w - rbuf->workers;

/* Make sure this worker-record isn't on any stack already. */
ASSERT(w->next == WORKER_NULL);

for (;;) {
/* Get the offset of the next worker-record on the stack. */
old_head = *stack_head;

/*
* Prepare to push that worker-record onto the stack,
* i.e. increment the version-number of the stack-head index.
*
* Since this worker-record isn't on any stack at this point,
* nothing about its next-offset (including its version-number)
* has to be preserved.
*/
w->next = (old_head & RBUF_OFF_MASK);
new_head = (w_offset | WRAP_INCR(old_head));
if (atomic_compare_exchange_weak(stack_head, old_head, new_head))
break;
SPINLOCK_BACKOFF(count);
}
}

/*
* pop_worker: pop a worker-record from the given stack.
*/
static inline ringbuf_worker_t *
pop_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_head)
{
worker_off_t old_head, new_head;
ringbuf_worker_t *w;
unsigned count = SPINLOCK_BACKOFF_MIN;

for (;;) {
worker_off_t old_head_offset;

/* Get the offset of the worker-record on top of the stack. */
old_head = *stack_head;
old_head_offset = (old_head & RBUF_OFF_MASK);
if (old_head_offset == WORKER_NULL)
return NULL;

/* Find that worker-record. */
w = &rbuf->workers[old_head_offset];

/*
* Prepare to pop that worker-record off of the stack,
* i.e. increment the version-number of the stack-head index.
*/
new_head = (w->next & RBUF_OFF_MASK);
new_head |= WRAP_INCR(old_head);
if (atomic_compare_exchange_weak(stack_head, old_head, new_head))
break;
SPINLOCK_BACKOFF(count);
}

/*
* Since this worker-record isn't on any stack at this point,
* nothing about its next-offset (including its version-number)
* has to be preserved.
*/
w->next = WORKER_NULL;

return w;
}

/*
* try_unlink_worker: try to unlink a worker-record from a stack.
*/
static inline bool
try_unlink_worker(ringbuf_t *rbuf, worker_off_t volatile *stack_link,
worker_off_t old_link)
{
ringbuf_worker_t *w;
worker_off_t old_link_offset, new_link;
bool success;

/* Find that worker-record. */
old_link_offset = (old_link & RBUF_OFF_MASK);
ASSERT (old_link_offset != WORKER_NULL);
w = &rbuf->workers[old_link_offset];

/*
* Prepare to unlink that worker-record from the stack,
* i.e. increment the version-number of the stack-link index.
*/
new_link = (w->next & RBUF_OFF_MASK);
new_link |= WRAP_INCR(old_link);
success = atomic_compare_exchange_weak(stack_link, old_link, new_link);

/*
* Since this worker-record isn't on any stack at this point,
* nothing about its next-offset (including its version-number)
* has to be preserved.
*/
if (success)
w->next = WORKER_NULL;

return success;
}

/*
* ringbuf_acquire: request a space of a given length in the ring buffer.
*
* => On success: returns the offset at which the space is available.
* => On failure: returns -1.
*/
ssize_t
ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len)
ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len)
{
ringbuf_off_t seen, next, target;
ringbuf_worker_t *w;

ASSERT(len > 0 && len <= rbuf->space);

/* Get a worker-record, to track state between acquire & produce. */
*pw = NULL;
w = pop_worker(rbuf, &rbuf->free_workers);
if (w == NULL)
return -1;
ASSERT(w->seen_off == RBUF_OFF_MAX);
w->seen_off = RBUF_OFF_MAX | WRAP_LOCK_BIT;
push_worker(rbuf, &rbuf->used_workers, w);

do {
ringbuf_off_t written;
Expand Down Expand Up @@ -244,6 +396,9 @@ ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len)
*/
w->seen_off &= ~WRAP_LOCK_BIT;

/* Hand this worker-record back to our caller. */
*pw = w;

/*
* If we set the WRAP_LOCK_BIT in the 'next' (because we exceed
* the remaining space and need to wrap-around), then save the
Expand Down Expand Up @@ -275,7 +430,6 @@ void
ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w)
{
(void)rbuf;
ASSERT(w->registered);
ASSERT(w->seen_off != RBUF_OFF_MAX);
atomic_thread_fence(memory_order_release);
w->seen_off = RBUF_OFF_MAX;
Expand All @@ -288,6 +442,8 @@ size_t
ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
{
ringbuf_off_t written = rbuf->written, next, ready;
worker_off_t volatile *pw_link;
worker_off_t w_link, w_off;
size_t towrite;
retry:
/*
Expand All @@ -311,16 +467,14 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
*/
ready = RBUF_OFF_MAX;

for (unsigned i = 0; i < rbuf->nworkers; i++) {
ringbuf_worker_t *w = &rbuf->workers[i];
pw_link = &rbuf->used_workers;
w_link = *pw_link;
w_off = (w_link & RBUF_OFF_MASK);
while (w_off != WORKER_NULL) {
ringbuf_worker_t *w = &rbuf->workers[w_off];
unsigned count = SPINLOCK_BACKOFF_MIN;
ringbuf_off_t seen_off;

/* Skip if the worker has not registered. */
if (!w->registered) {
continue;
}

/*
* Get a stable 'seen' value. This is necessary since we
* want to discard the stale 'seen' values.
Expand All @@ -329,6 +483,23 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
SPINLOCK_BACKOFF(count);
}

/* If this worker has produced, clean it up. */
if (seen_off == RBUF_OFF_MAX) {
/*
* Try to unlink this worker-record from the used-worker
* stack.
* If it can't be done, try again later.
*/
if (try_unlink_worker(rbuf, pw_link, w_link)) {
/* Free this unused worker-record. */
w->seen_off = RBUF_OFF_MAX;
push_worker(rbuf, &rbuf->free_workers, w);
w_link = *pw_link;
w_off = (w_link & RBUF_OFF_MASK);
continue;
}
}

/*
* Ignore the offsets after the possible wrap-around.
* We are interested in the smallest seen offset that is
Expand All @@ -338,6 +509,11 @@ ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
ready = MIN(seen_off, ready);
}
ASSERT(ready >= written);

/* Move to the next incomplete acquire/produce operation. */
pw_link = &w->next;
w_link = *pw_link;
w_off = (w_link & RBUF_OFF_MASK);
}

/*
Expand Down
2 changes: 1 addition & 1 deletion src/ringbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void ringbuf_get_sizes(unsigned, size_t *, size_t *);
ringbuf_worker_t *ringbuf_register(ringbuf_t *, unsigned);
void ringbuf_unregister(ringbuf_t *, ringbuf_worker_t *);

ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t *, size_t);
ssize_t ringbuf_acquire(ringbuf_t *, ringbuf_worker_t **, size_t);
void ringbuf_produce(ringbuf_t *, ringbuf_worker_t *);
size_t ringbuf_consume(ringbuf_t *, size_t *);
void ringbuf_release(ringbuf_t *, size_t);
Expand Down
Loading