From e5fbae374c020b3edddeb6415e83b7cbaeaefe11 Mon Sep 17 00:00:00 2001 From: Schuyler Goodman Date: Wed, 10 Dec 2025 10:22:18 -0500 Subject: [PATCH 01/13] Initial commit of uniflight --- CHANGELOG.md | 1 + Cargo.lock | 45 ++++ Cargo.toml | 9 +- README.md | 8 +- crates/uniflight/CHANGELOG.md | 1 + crates/uniflight/Cargo.toml | 28 +++ crates/uniflight/README.md | 71 ++++++ crates/uniflight/favicon.ico | 3 + crates/uniflight/logo.png | 3 + crates/uniflight/src/lib.rs | 405 ++++++++++++++++++++++++++++++++++ 10 files changed, 570 insertions(+), 4 deletions(-) create mode 100644 crates/uniflight/CHANGELOG.md create mode 100644 crates/uniflight/Cargo.toml create mode 100644 crates/uniflight/README.md create mode 100644 crates/uniflight/favicon.ico create mode 100644 crates/uniflight/logo.png create mode 100644 crates/uniflight/src/lib.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index f9fe2e63..a826ab4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,3 +14,4 @@ Please see each crate's change log below: - [`thread_aware`](./crates/thread_aware/CHANGELOG.md) - [`thread_aware_macros`](./crates/thread_aware_macros/CHANGELOG.md) - [`thread_aware_macros_impl`](./crates/thread_aware_macros_impl/CHANGELOG.md) +- [`uniflight`](./crates/uniflight/CHANGELOG.md) diff --git a/Cargo.lock b/Cargo.lock index 382e0ddd..ade02e31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -57,6 +57,15 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +[[package]] +name = "branches" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f11502672c5570f77f6bdf573332483f8475bab6a7fda00f1fae8ddb5a6245c0" +dependencies = [ + "rustc_version", +] + [[package]] name = "byteorder" version = "1.5.0" @@ -200,6 +209,21 @@ dependencies = [ "itertools 0.13.0", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crunchy" version = "0.2.4" @@ -1282,6 +1306,16 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "uniflight" +version = "0.4.0" +dependencies = [ + "futures-util", + "parking_lot", + "tokio", + "xutex", +] + [[package]] name = "walkdir" version = "2.5.0" @@ -1515,6 +1549,17 @@ version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +[[package]] +name = "xutex" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c6a2a824bda0270095d584b553e1f084652e5fc40ebf3945e78a14a2437e0c6" +dependencies = [ + "branches", + "crossbeam-queue", + "once_cell", +] + [[package]] name = "xxhash-rust" version = "0.8.15" diff --git a/Cargo.toml b/Cargo.toml index 4e113ec0..7647e8a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ testing_aids = { path = "crates/testing_aids", default-features = false, version thread_aware = { path = "crates/thread_aware", default-features = false, version = "0.4.0" } thread_aware_macros = { path = "crates/thread_aware_macros", default-features = false, version = "0.4.0" } thread_aware_macros_impl = { path = "crates/thread_aware_macros_impl", default-features = false, version = "0.4.0" } +uniflight = { path = "crates/uniflight", default-features = false, version = "0.4.0" } # external dependencies alloc_tracker = { version = "0.5.9", default-features = false } @@ -46,6 +47,7 @@ criterion = { version = "0.7.0", default-features = false } derive_more = { version = "2.0.1", default-features = false } duct = { version = "1.1.1", default-features = false } futures = { version = "0.3.31", default-features = false } +futures-util = { version = "0.3.31", default-features = false } infinity_pool = { version = "0.8.1", default-features = false } insta = { version = "1.44.1", default-features = false } many_cpus = { version = "1.1.0", default-features = false } @@ -54,6 +56,7 @@ mutants = { version = "0.0.3", default-features = false } new_zealand = { version = "1.0.1", default-features = false } nm = { version = "0.1.21", default-features = false } once_cell = { version = "1.21.3", default-features = false } +parking_lot = { version = "0.12.5", default-features = false } pretty_assertions = { version = "1.4.1", default-features = false } prettyplease = { version = "0.2.37", default-features = false } proc-macro2 = { version = "1.0.103", default-features = false } @@ -71,6 +74,7 @@ tracing = { version = "0.1.41", default-features = false } tracing-subscriber = { version = "0.3.20", default-features = false } trybuild = { version = "1.0.114", default-features = false } typeid = { version = "1.0.3", default-features = false } +xutex = { version = "0.2.0", default-features = false } xxhash-rust = { version = "0.8.15", default-features = false } [workspace.lints.rust] @@ -84,7 +88,10 @@ unsafe_op_in_unsafe_fn = "warn" unused_lifetimes = "warn" # Allow cfg attributes for coverage builds and docs.rs builds -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage,coverage_nightly)', 'cfg(docsrs)'] } +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(coverage,coverage_nightly)', + 'cfg(docsrs)', +] } [workspace.lints.clippy] cargo = { level = "warn", priority = -1 } diff --git a/README.md b/README.md index e794bce8..77359b4e 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,15 @@ This repository contains a set of crates that help you build robust highly scalable services in Rust. -- [Crates](#crates) -- [About this Repo](#about-this-repo) +- [The Oxidizer Project](#the-oxidizer-project) + - [Crates](#crates) + - [About this Repo](#about-this-repo) - [Adding New Crates](#adding-new-crates) - [Publishing Crates](#publishing-crates) - [Documenting Crates](#documenting-crates) - [CI Workflows](#ci-workflows) - [Pull Request Gates](#pull-request-gates) -- [Trademarks](#trademarks) + - [Trademarks](#trademarks) ## Crates @@ -36,6 +37,7 @@ These are the crates built out of this repo: - [`thread_aware`](./crates/thread_aware/README.md) - Facilities to support thread-isolated state. - [`thread_aware_macros`](./crates/thread_aware_macros/README.md) - Macros for the `thread_aware` crate. - [`thread_aware_macros_impl`](./crates/thread_aware_macros_impl/README.md) - Macros for the `thread_aware` crate. +- [`uniflight`](./crates/uniflight/README.md) - Coalesces duplicate async tasks into a single execution. ## About this Repo diff --git a/crates/uniflight/CHANGELOG.md b/crates/uniflight/CHANGELOG.md new file mode 100644 index 00000000..825c32f0 --- /dev/null +++ b/crates/uniflight/CHANGELOG.md @@ -0,0 +1 @@ +# Changelog diff --git a/crates/uniflight/Cargo.toml b/crates/uniflight/Cargo.toml new file mode 100644 index 00000000..76df9480 --- /dev/null +++ b/crates/uniflight/Cargo.toml @@ -0,0 +1,28 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +[package] +name = "uniflight" +description = "Coalesces multiple ongoing tasks into a leader which does the work, and follower tasks that wait on the result, to prevent duplicate I/O or other downstream overhead." +version = "0.4.0" +readme = "README.md" +keywords = ["oxidizer", "singleflight"] +categories = ["Concurrency"] + +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +parking_lot = { workspace = true, default-features = false } +xutex = { workspace = true, default-features = false } + +[dev-dependencies] +futures-util = { workspace = true, default-features = false, features = ["alloc", "std"] } +tokio = { workspace = true, features = ["macros", "rt", "time", "rt-multi-thread"] } + +[lints] +workspace = true diff --git a/crates/uniflight/README.md b/crates/uniflight/README.md new file mode 100644 index 00000000..cce07f50 --- /dev/null +++ b/crates/uniflight/README.md @@ -0,0 +1,71 @@ +
+ Uniflight Logo + +# Uniflight + +[![crate.io](https://img.shields.io/crates/v/uniflight.svg)](https://crates.io/crates/uniflight) +[![docs.rs](https://docs.rs/uniflight/badge.svg)](https://docs.rs/uniflight) +[![MSRV](https://img.shields.io/crates/msrv/uniflight)](https://crates.io/crates/uniflight) +[![CI](https://github.com/microsoft/oxidizer/workflows/main/badge.svg)](https://github.com/microsoft/oxidizer/actions) +[![Coverage](https://codecov.io/gh/microsoft/oxidizer/graph/badge.svg?token=FCUG0EL5TI)](https://codecov.io/gh/microsoft/oxidizer) +[![License](https://img.shields.io/badge/license-MIT-blue.svg)](../LICENSE) + +
+ +* [Summary](#summary) + +## Summary + + + +Coalesces duplicate async tasks into a single execution. + +This crate provides [`UniFlight`], a mechanism for deduplicating concurrent async operations. +When multiple tasks request the same work (identified by a key), only the first task (the +"leader") performs the actual work while subsequent tasks (the "followers") wait and receive +a clone of the result. + +## When to Use + +Use `UniFlight` when you have expensive or rate-limited operations that may be requested +concurrently with the same parameters: + +- **Cache population**: Prevent thundering herd when a cache entry expires +- **API calls**: Deduplicate concurrent requests to the same endpoint +- **Database queries**: Coalesce identical queries issued simultaneously +- **File I/O**: Avoid reading the same file multiple times concurrently + +## Example + +```rust +use uniflight::UniFlight; + +let group: UniFlight<&str, String> = UniFlight::new(); + +// Multiple concurrent calls with the same key will share a single execution +let result = group.work("user:123", || async { + // This expensive operation runs only once, even if called concurrently + "expensive_result".to_string() +}).await; +``` + +## Cancellation and Panic Safety + +`UniFlight` handles task cancellation and panics gracefully: + +- If the leader task is cancelled or dropped, a follower becomes the new leader +- If the leader task panics, a follower becomes the new leader and executes its work +- Followers that join before the leader completes receive the cached result + +## Thread Safety + +[`UniFlight`] is `Send` and `Sync`, and can be shared across threads. The returned futures +do not require `Send` bounds on the closure or its output. + + + +

+ +This crate was developed as part of [The Oxidizer Project](https://github.com/microsoft/oxidizer). + +
diff --git a/crates/uniflight/favicon.ico b/crates/uniflight/favicon.ico new file mode 100644 index 00000000..ccae8114 --- /dev/null +++ b/crates/uniflight/favicon.ico @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e48225cb42c8ac02df5dd4a9bdba29c1e8d10436cc27055d6a021bcb951d4145 +size 480683 diff --git a/crates/uniflight/logo.png b/crates/uniflight/logo.png new file mode 100644 index 00000000..f2bd6691 --- /dev/null +++ b/crates/uniflight/logo.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:63ad64ebb0185d7cd153acc291989d72e3a0094603d8bfe781a220861de247f2 +size 17876 diff --git a/crates/uniflight/src/lib.rs b/crates/uniflight/src/lib.rs new file mode 100644 index 00000000..ccefb69c --- /dev/null +++ b/crates/uniflight/src/lib.rs @@ -0,0 +1,405 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +// Based on singleflight-async by ihciah +// Original: https://github.com/ihciah/singleflight-async +// Licensed under MIT/Apache-2.0 + +//! Coalesces duplicate async tasks into a single execution. +//! +//! This crate provides [`UniFlight`], a mechanism for deduplicating concurrent async operations. +//! When multiple tasks request the same work (identified by a key), only the first task (the +//! "leader") performs the actual work while subsequent tasks (the "followers") wait and receive +//! a clone of the result. +//! +//! # When to Use +//! +//! Use `UniFlight` when you have expensive or rate-limited operations that may be requested +//! concurrently with the same parameters: +//! +//! - **Cache population**: Prevent thundering herd when a cache entry expires +//! - **API calls**: Deduplicate concurrent requests to the same endpoint +//! - **Database queries**: Coalesce identical queries issued simultaneously +//! - **File I/O**: Avoid reading the same file multiple times concurrently +//! +//! # Example +//! +//! ``` +//! use uniflight::UniFlight; +//! +//! # async fn example() { +//! let group: UniFlight<&str, String> = UniFlight::new(); +//! +//! // Multiple concurrent calls with the same key will share a single execution +//! let result = group.work("user:123", || async { +//! // This expensive operation runs only once, even if called concurrently +//! "expensive_result".to_string() +//! }).await; +//! # } +//! ``` +//! +//! # Cancellation and Panic Safety +//! +//! `UniFlight` handles task cancellation and panics gracefully: +//! +//! - If the leader task is cancelled or dropped, a follower becomes the new leader +//! - If the leader task panics, a follower becomes the new leader and executes its work +//! - Followers that join before the leader completes receive the cached result +//! +//! # Thread Safety +//! +//! [`UniFlight`] is `Send` and `Sync`, and can be shared across threads. The returned futures +//! do not require `Send` bounds on the closure or its output. + +#![doc(html_logo_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/uniflight/logo.png")] +#![doc(html_favicon_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/uniflight/favicon.ico")] + +use std::{ + collections::HashMap, + hash::Hash, + sync::{Arc, Weak}, +}; + +use parking_lot::Mutex as SyncMutex; +use xutex::AsyncMutex; + +type SharedMapping = Arc>>>; + +/// UniFlight represents a class of work and creates a space in which units of work +/// can be executed with duplicate suppression. +#[derive(Debug)] +pub struct UniFlight { + mapping: SharedMapping, +} + +impl Default for UniFlight { + fn default() -> Self { + Self { + mapping: Default::default(), + } + } +} + +struct Shared { + slot: AsyncMutex>, +} + +impl Default for Shared { + fn default() -> Self { + Self { + slot: AsyncMutex::new(None), + } + } +} + +/// `BroadcastOnce` consists of shared slot and notify. +#[derive(Clone)] +struct BroadcastOnce { + shared: Weak>, +} + +impl BroadcastOnce { + fn new() -> (Self, Arc>) { + let shared = Arc::new(Shared::default()); + ( + Self { + shared: Arc::downgrade(&shared), + }, + shared, + ) + } +} + +// After calling BroadcastOnce::waiter we can get a waiter. +// It's in WaitList. +struct BroadcastOnceWaiter { + func: F, + shared: Arc>, + + key: K, + mapping: SharedMapping, +} + +impl std::fmt::Debug for BroadcastOnce { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BroadcastOnce") + } +} + +#[allow(clippy::type_complexity)] +impl BroadcastOnce { + fn try_waiter( + &self, + func: F, + key: K, + mapping: SharedMapping, + ) -> Result, (F, K, SharedMapping)> { + let Some(upgraded) = self.shared.upgrade() else { + return Err((func, key, mapping)); + }; + Ok(BroadcastOnceWaiter { + func, + shared: upgraded, + key, + mapping, + }) + } + + #[inline] + const fn waiter(shared: Arc>, func: F, key: K, mapping: SharedMapping) -> BroadcastOnceWaiter { + BroadcastOnceWaiter { + func, + shared, + key, + mapping, + } + } +} + +// We already in WaitList, so wait will be fine, we won't miss +// anything after Waiter generated. +impl BroadcastOnceWaiter +where + K: Hash + Eq, + F: FnOnce() -> Fut, + Fut: Future, + T: Clone, +{ + async fn wait(self) -> T { + let mut slot = self.shared.slot.lock().await; + if let Some(value) = (*slot).as_ref() { + return value.clone(); + } + + let value = (self.func)().await; + *slot = Some(value.clone()); + + self.mapping.lock().remove(&self.key); + + value + } +} + +impl UniFlight +where + K: Hash + Eq + Clone, +{ + /// Create a new BroadcastOnce to do work with. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Execute and return the value for a given function, making sure that only one + /// operation is in-flight at a given moment. If a duplicate call comes in, that caller will + /// wait until the original call completes and return the same value. + pub fn work(&self, key: K, func: F) -> impl Future + where + F: FnOnce() -> Fut, + Fut: Future, + T: Clone, + { + let owned_mapping = self.mapping.clone(); + let mut mapping = self.mapping.lock(); + let val = mapping.get_mut(&key); + match val { + Some(call) => { + let (func, key, owned_mapping) = match call.try_waiter(func, key, owned_mapping) { + Ok(waiter) => return waiter.wait(), + Err(fm) => fm, + }; + let (new_call, shared) = BroadcastOnce::new(); + *call = new_call; + let waiter = BroadcastOnce::waiter(shared, func, key, owned_mapping); + waiter.wait() + } + None => { + let (call, shared) = BroadcastOnce::new(); + mapping.insert(key.clone(), call); + let waiter = BroadcastOnce::waiter(shared, func, key, owned_mapping); + waiter.wait() + } + } + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::atomic::{ + AtomicUsize, + Ordering::{AcqRel, Acquire}, + }, + time::Duration, + }; + + use futures_util::{StreamExt, stream::FuturesUnordered}; + + use super::*; + + #[tokio::test] + async fn direct_call() { + let group = UniFlight::new(); + let result = group + .work("key", || async { + tokio::time::sleep(Duration::from_millis(10)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); + } + + #[tokio::test] + async fn parallel_call() { + let call_counter = AtomicUsize::default(); + + let group = UniFlight::new(); + let futures = FuturesUnordered::new(); + for _ in 0..10 { + futures.push(group.work("key", || async { + tokio::time::sleep(Duration::from_millis(100)).await; + call_counter.fetch_add(1, AcqRel); + "Result".to_string() + })); + } + + assert!(futures.all(|out| async move { out == "Result" }).await); + assert_eq!(call_counter.load(Acquire), 1, "future should only be executed once"); + } + + #[tokio::test] + async fn parallel_call_seq_await() { + let call_counter = AtomicUsize::default(); + + let group = UniFlight::new(); + let mut futures = Vec::new(); + for _ in 0..10 { + futures.push(group.work("key", || async { + tokio::time::sleep(Duration::from_millis(100)).await; + call_counter.fetch_add(1, AcqRel); + "Result".to_string() + })); + } + + for fut in futures.into_iter() { + assert_eq!(fut.await, "Result"); + } + assert_eq!(call_counter.load(Acquire), 1, "future should only be executed once"); + } + + #[tokio::test] + async fn call_with_static_str_key() { + let group = UniFlight::new(); + let result = group + .work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(1)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); + } + + #[tokio::test] + async fn call_with_static_string_key() { + let group = UniFlight::new(); + let result = group + .work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(1)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); + } + + #[tokio::test] + async fn call_with_custom_key() { + #[derive(Clone, PartialEq, Eq, Hash)] + struct K(i32); + let group = UniFlight::new(); + let result = group + .work(K(1), || async { + tokio::time::sleep(Duration::from_millis(1)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); + } + + #[tokio::test] + async fn late_wait() { + let group = UniFlight::new(); + let fut_early = group.work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(20)).await; + "Result".to_string() + }); + let fut_late = group.work("key".into(), || async { panic!("unexpected") }); + assert_eq!(fut_early.await, "Result"); + tokio::time::sleep(Duration::from_millis(50)).await; + assert_eq!(fut_late.await, "Result"); + } + + #[tokio::test] + async fn cancel() { + let group = UniFlight::new(); + + // the executer cancelled and the other awaiter will create a new future and execute. + let fut_cancel = group.work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(2000)).await; + "Result1".to_string() + }); + let _ = tokio::time::timeout(Duration::from_millis(10), fut_cancel).await; + let fut_late = group.work("key".to_string(), || async { "Result2".to_string() }); + assert_eq!(fut_late.await, "Result2"); + + // the first executer is slow but not dropped, so the result will be the first ones. + let begin = tokio::time::Instant::now(); + let fut_1 = group.work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(2000)).await; + "Result1".to_string() + }); + let fut_2 = group.work("key".to_string(), || async { panic!() }); + let (v1, v2) = tokio::join!(fut_1, fut_2); + assert_eq!(v1, "Result1"); + assert_eq!(v2, "Result1"); + assert!(begin.elapsed() > Duration::from_millis(1500)); + } + + #[tokio::test] + async fn leader_panic_in_spawned_task() { + let call_counter = AtomicUsize::default(); + let group: Arc> = Arc::new(UniFlight::new()); + + // First task will panic in a spawned task (no catch_unwind) + let group_clone = group.clone(); + let handle = tokio::spawn(async move { + group_clone + .work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(50)).await; + panic!("leader panicked in spawned task"); + #[allow(unreachable_code)] + "never".to_string() + }) + .await + }); + + // Give time for the spawned task to register and start + tokio::time::sleep(Duration::from_millis(10)).await; + + // Second task should become the new leader after the first panics + let group_clone = group.clone(); + let call_counter_ref = &call_counter; + let fut_follower = group_clone.work("key".to_string(), || async { + call_counter_ref.fetch_add(1, AcqRel); + "Result".to_string() + }); + + // Wait for the spawned task to panic + let spawn_result = handle.await; + assert!(spawn_result.is_err(), "spawned task should have panicked"); + + // The follower should succeed - Rust's drop semantics ensure the mutex is released + let result = fut_follower.await; + assert_eq!(result, "Result"); + assert_eq!(call_counter.load(Acquire), 1, "follower should have executed its work"); + } +} From 9eb29c0746f7339349aa832980c0f4a6538f4748 Mon Sep 17 00:00:00 2001 From: Schuyler Goodman Date: Wed, 10 Dec 2025 10:29:25 -0500 Subject: [PATCH 02/13] Update docs --- Cargo.lock | 2 +- Cargo.toml | 2 +- crates/uniflight/CHANGELOG.md | 7 ++ crates/uniflight/Cargo.toml | 2 +- crates/uniflight/examples/cache_population.rs | 67 +++++++++++++++++++ 5 files changed, 77 insertions(+), 3 deletions(-) create mode 100644 crates/uniflight/examples/cache_population.rs diff --git a/Cargo.lock b/Cargo.lock index ade02e31..1ac674a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1308,7 +1308,7 @@ checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" [[package]] name = "uniflight" -version = "0.4.0" +version = "0.1.0" dependencies = [ "futures-util", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 7647e8a1..215aed85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ testing_aids = { path = "crates/testing_aids", default-features = false, version thread_aware = { path = "crates/thread_aware", default-features = false, version = "0.4.0" } thread_aware_macros = { path = "crates/thread_aware_macros", default-features = false, version = "0.4.0" } thread_aware_macros_impl = { path = "crates/thread_aware_macros_impl", default-features = false, version = "0.4.0" } -uniflight = { path = "crates/uniflight", default-features = false, version = "0.4.0" } +uniflight = { path = "crates/uniflight", default-features = false, version = "0.1.0" } # external dependencies alloc_tracker = { version = "0.5.9", default-features = false } diff --git a/crates/uniflight/CHANGELOG.md b/crates/uniflight/CHANGELOG.md index 825c32f0..0906fd27 100644 --- a/crates/uniflight/CHANGELOG.md +++ b/crates/uniflight/CHANGELOG.md @@ -1 +1,8 @@ # Changelog + +## [0.1.0] - 2025-12-10 + +- 🧩 Miscellaneous + + - Initial commit of uniflight + diff --git a/crates/uniflight/Cargo.toml b/crates/uniflight/Cargo.toml index 76df9480..0539a483 100644 --- a/crates/uniflight/Cargo.toml +++ b/crates/uniflight/Cargo.toml @@ -4,7 +4,7 @@ [package] name = "uniflight" description = "Coalesces multiple ongoing tasks into a leader which does the work, and follower tasks that wait on the result, to prevent duplicate I/O or other downstream overhead." -version = "0.4.0" +version = "0.1.0" readme = "README.md" keywords = ["oxidizer", "singleflight"] categories = ["Concurrency"] diff --git a/crates/uniflight/examples/cache_population.rs b/crates/uniflight/examples/cache_population.rs new file mode 100644 index 00000000..663dc334 --- /dev/null +++ b/crates/uniflight/examples/cache_population.rs @@ -0,0 +1,67 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Demonstrates using UniFlight to prevent thundering herd when populating a cache. +//! +//! Multiple concurrent requests for the same cache key will share a single execution, +//! with the first request (leader) performing the work and subsequent requests (followers) +//! receiving a copy of the result. + +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +use uniflight::UniFlight; + +#[tokio::main] +async fn main() { + // Create a shared UniFlight instance for cache operations + let cache_group = Arc::new(UniFlight::::new()); + + // Track how many times the work closure actually executes + let execution_count = Arc::new(AtomicUsize::new(0)); + + println!("Starting 5 concurrent requests for user:123...\n"); + + // Simulate 5 concurrent requests for the same user data + let mut handles = Vec::new(); + for i in 1..=5 { + let group = cache_group.clone(); + let counter = execution_count.clone(); + let handle = tokio::spawn(async move { + let start = tokio::time::Instant::now(); + + let result = group + .work("user:123".to_string(), || async { + let count = counter.fetch_add(1, Ordering::SeqCst) + 1; + println!(" [Request {i}] I'm the leader! Fetching from database... (execution #{count})"); + + // Simulate expensive database query + tokio::time::sleep(Duration::from_millis(500)).await; + + "UserData(name: Alice, age: 30)".to_string() + }) + .await; + + let elapsed = start.elapsed(); + println!(" [Request {i}] Got result in {elapsed:?}: {result}"); + }); + + handles.push(handle); + + // Stagger the requests slightly to see the deduplication in action + tokio::time::sleep(Duration::from_millis(10)).await; + } + + // Wait for all requests to complete + for handle in handles { + handle.await.expect("Task panicked"); + } + + let total_executions = execution_count.load(Ordering::SeqCst); + println!("\nAll requests completed! Database query executed {total_executions} time(s) for 5 requests."); +} From e29c3f46bea0441d069c1e896fdbf1fe3fca6970 Mon Sep 17 00:00:00 2001 From: Schuyler Goodman Date: Wed, 10 Dec 2025 10:49:57 -0500 Subject: [PATCH 03/13] Fix clippy --- crates/uniflight/examples/cache_population.rs | 6 +- crates/uniflight/src/lib.rs | 55 +++++++++---------- 2 files changed, 30 insertions(+), 31 deletions(-) diff --git a/crates/uniflight/examples/cache_population.rs b/crates/uniflight/examples/cache_population.rs index 663dc334..b45b7ee3 100644 --- a/crates/uniflight/examples/cache_population.rs +++ b/crates/uniflight/examples/cache_population.rs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -//! Demonstrates using UniFlight to prevent thundering herd when populating a cache. +//! Demonstrates using `UniFlight` to prevent thundering herd when populating a cache. //! //! Multiple concurrent requests for the same cache key will share a single execution, //! with the first request (leader) performing the work and subsequent requests (followers) @@ -30,8 +30,8 @@ async fn main() { // Simulate 5 concurrent requests for the same user data let mut handles = Vec::new(); for i in 1..=5 { - let group = cache_group.clone(); - let counter = execution_count.clone(); + let group = Arc::clone(&cache_group); + let counter = Arc::clone(&execution_count); let handle = tokio::spawn(async move { let start = tokio::time::Instant::now(); diff --git a/crates/uniflight/src/lib.rs b/crates/uniflight/src/lib.rs index ccefb69c..a0398d91 100644 --- a/crates/uniflight/src/lib.rs +++ b/crates/uniflight/src/lib.rs @@ -65,7 +65,7 @@ use xutex::AsyncMutex; type SharedMapping = Arc>>>; -/// UniFlight represents a class of work and creates a space in which units of work +/// Represents a class of work and creates a space in which units of work /// can be executed with duplicate suppression. #[derive(Debug)] pub struct UniFlight { @@ -74,9 +74,7 @@ pub struct UniFlight { impl Default for UniFlight { fn default() -> Self { - Self { - mapping: Default::default(), - } + Self { mapping: Arc::default() } } } @@ -126,7 +124,10 @@ impl std::fmt::Debug for BroadcastOnce { } } -#[allow(clippy::type_complexity)] +#[expect( + clippy::type_complexity, + reason = "The Result type is complex but intentionally groups related items for the retry pattern" +)] impl BroadcastOnce { fn try_waiter( &self, @@ -184,8 +185,9 @@ impl UniFlight where K: Hash + Eq + Clone, { - /// Create a new BroadcastOnce to do work with. + /// Creates a new `UniFlight` instance. #[inline] + #[must_use] pub fn new() -> Self { Self::default() } @@ -199,26 +201,23 @@ where Fut: Future, T: Clone, { - let owned_mapping = self.mapping.clone(); + let owned_mapping = Arc::clone(&self.mapping); let mut mapping = self.mapping.lock(); let val = mapping.get_mut(&key); - match val { - Some(call) => { - let (func, key, owned_mapping) = match call.try_waiter(func, key, owned_mapping) { - Ok(waiter) => return waiter.wait(), - Err(fm) => fm, - }; - let (new_call, shared) = BroadcastOnce::new(); - *call = new_call; - let waiter = BroadcastOnce::waiter(shared, func, key, owned_mapping); - waiter.wait() - } - None => { - let (call, shared) = BroadcastOnce::new(); - mapping.insert(key.clone(), call); - let waiter = BroadcastOnce::waiter(shared, func, key, owned_mapping); - waiter.wait() - } + if let Some(call) = val { + let (func, key, owned_mapping) = match call.try_waiter(func, key, owned_mapping) { + Ok(waiter) => return waiter.wait(), + Err(fm) => fm, + }; + let (new_call, shared) = BroadcastOnce::new(); + *call = new_call; + let waiter = BroadcastOnce::waiter(shared, func, key, owned_mapping); + waiter.wait() + } else { + let (call, shared) = BroadcastOnce::new(); + mapping.insert(key.clone(), call); + let waiter = BroadcastOnce::waiter(shared, func, key, owned_mapping); + waiter.wait() } } } @@ -281,7 +280,7 @@ mod tests { })); } - for fut in futures.into_iter() { + for fut in futures { assert_eq!(fut.await, "Result"); } assert_eq!(call_counter.load(Acquire), 1, "future should only be executed once"); @@ -370,13 +369,13 @@ mod tests { let group: Arc> = Arc::new(UniFlight::new()); // First task will panic in a spawned task (no catch_unwind) - let group_clone = group.clone(); + let group_clone = Arc::clone(&group); let handle = tokio::spawn(async move { group_clone .work("key".to_string(), || async { tokio::time::sleep(Duration::from_millis(50)).await; panic!("leader panicked in spawned task"); - #[allow(unreachable_code)] + #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] "never".to_string() }) .await @@ -386,7 +385,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(10)).await; // Second task should become the new leader after the first panics - let group_clone = group.clone(); + let group_clone = Arc::clone(&group); let call_counter_ref = &call_counter; let fut_follower = group_clone.work("key".to_string(), || async { call_counter_ref.fetch_add(1, AcqRel); From 3f508a5a7c690e204e46960f95eebe5eb3ffcd24 Mon Sep 17 00:00:00 2001 From: Schuyler Goodman Date: Thu, 11 Dec 2025 09:47:33 -0500 Subject: [PATCH 04/13] Add logo --- crates/uniflight/logo.png | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/uniflight/logo.png b/crates/uniflight/logo.png index f2bd6691..90a48880 100644 --- a/crates/uniflight/logo.png +++ b/crates/uniflight/logo.png @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:63ad64ebb0185d7cd153acc291989d72e3a0094603d8bfe781a220861de247f2 -size 17876 +oid sha256:ec6fa0a1a07f8c21fb9946bc8ea78f127f027554fd8ca67f0b296d8c3d234b68 +size 46554 From ff628d0320dca7881e8e5841aca3a84f4fe895ed Mon Sep 17 00:00:00 2001 From: Schuyler Goodman Date: Thu, 11 Dec 2025 10:42:52 -0500 Subject: [PATCH 05/13] Fix formatting --- crates/uniflight/examples/cache_population.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/uniflight/examples/cache_population.rs b/crates/uniflight/examples/cache_population.rs index b45b7ee3..ab570f0f 100644 --- a/crates/uniflight/examples/cache_population.rs +++ b/crates/uniflight/examples/cache_population.rs @@ -9,8 +9,8 @@ use std::{ sync::{ - atomic::{AtomicUsize, Ordering}, Arc, + atomic::{AtomicUsize, Ordering}, }, time::Duration, }; From 8aa3ffb60354d42d09e48af8df50d2b0755e6881 Mon Sep 17 00:00:00 2001 From: Schuyler Goodman Date: Thu, 11 Dec 2025 12:14:34 -0500 Subject: [PATCH 06/13] Update favicon and code coverage --- .gitignore | 3 ++- crates/uniflight/favicon.ico | 4 ++-- crates/uniflight/src/lib.rs | 44 ++++++++++++++++++++++++++++-------- 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/.gitignore b/.gitignore index 20b91f6b..6368ac41 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,5 @@ _manifest ARROW # Agent files -.claude \ No newline at end of file +.claude +CLAUDE.md \ No newline at end of file diff --git a/crates/uniflight/favicon.ico b/crates/uniflight/favicon.ico index ccae8114..2ed275b8 100644 --- a/crates/uniflight/favicon.ico +++ b/crates/uniflight/favicon.ico @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:e48225cb42c8ac02df5dd4a9bdba29c1e8d10436cc27055d6a021bcb951d4145 -size 480683 +oid sha256:bd1ebeca79834229253d38008b7d0491947042e808664646f907472acbb9ba01 +size 15406 diff --git a/crates/uniflight/src/lib.rs b/crates/uniflight/src/lib.rs index a0398d91..ff049afd 100644 --- a/crates/uniflight/src/lib.rs +++ b/crates/uniflight/src/lib.rs @@ -236,6 +236,10 @@ mod tests { use super::*; + fn unreachable_future() -> std::future::Pending { + std::future::pending() + } + #[tokio::test] async fn direct_call() { let group = UniFlight::new(); @@ -263,7 +267,7 @@ mod tests { } assert!(futures.all(|out| async move { out == "Result" }).await); - assert_eq!(call_counter.load(Acquire), 1, "future should only be executed once"); + assert_eq!(call_counter.load(Acquire), 1); } #[tokio::test] @@ -283,7 +287,7 @@ mod tests { for fut in futures { assert_eq!(fut.await, "Result"); } - assert_eq!(call_counter.load(Acquire), 1, "future should only be executed once"); + assert_eq!(call_counter.load(Acquire), 1); } #[tokio::test] @@ -331,7 +335,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(20)).await; "Result".to_string() }); - let fut_late = group.work("key".into(), || async { panic!("unexpected") }); + let fut_late = group.work("key".into(), unreachable_future); assert_eq!(fut_early.await, "Result"); tokio::time::sleep(Duration::from_millis(50)).await; assert_eq!(fut_late.await, "Result"); @@ -342,10 +346,7 @@ mod tests { let group = UniFlight::new(); // the executer cancelled and the other awaiter will create a new future and execute. - let fut_cancel = group.work("key".to_string(), || async { - tokio::time::sleep(Duration::from_millis(2000)).await; - "Result1".to_string() - }); + let fut_cancel = group.work("key".to_string(), unreachable_future); let _ = tokio::time::timeout(Duration::from_millis(10), fut_cancel).await; let fut_late = group.work("key".to_string(), || async { "Result2".to_string() }); assert_eq!(fut_late.await, "Result2"); @@ -356,7 +357,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(2000)).await; "Result1".to_string() }); - let fut_2 = group.work("key".to_string(), || async { panic!() }); + let fut_2 = group.work("key".to_string(), unreachable_future); let (v1, v2) = tokio::join!(fut_1, fut_2); assert_eq!(v1, "Result1"); assert_eq!(v2, "Result1"); @@ -394,11 +395,34 @@ mod tests { // Wait for the spawned task to panic let spawn_result = handle.await; - assert!(spawn_result.is_err(), "spawned task should have panicked"); + assert!(spawn_result.is_err()); // The follower should succeed - Rust's drop semantics ensure the mutex is released let result = fut_follower.await; assert_eq!(result, "Result"); - assert_eq!(call_counter.load(Acquire), 1, "follower should have executed its work"); + assert_eq!(call_counter.load(Acquire), 1); + } + + #[tokio::test] + async fn debug_impl() { + let group: UniFlight = UniFlight::new(); + + // Test Debug on empty group + let debug_str = format!("{:?}", group); + assert!(debug_str.contains("UniFlight")); + + // Create a pending work item to populate the mapping with a BroadcastOnce + let fut = group.work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(100)).await; + "Result".to_string() + }); + + // Debug should still work with entries in the mapping + let debug_str = format!("{:?}", group); + assert!(debug_str.contains("UniFlight")); + assert!(debug_str.contains("BroadcastOnce")); + + // Complete the work + assert_eq!(fut.await, "Result"); } } From 6ce6e111bc46946513d870bd68855722b4c33def Mon Sep 17 00:00:00 2001 From: schgoo <138131263+schgoo@users.noreply.github.com> Date: Thu, 11 Dec 2025 16:27:52 -0500 Subject: [PATCH 07/13] Update crates/uniflight/Cargo.toml Co-authored-by: Martin Taillefer --- crates/uniflight/Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/uniflight/Cargo.toml b/crates/uniflight/Cargo.toml index 0539a483..f62a9e1a 100644 --- a/crates/uniflight/Cargo.toml +++ b/crates/uniflight/Cargo.toml @@ -17,11 +17,11 @@ homepage.workspace = true repository.workspace = true [dependencies] -parking_lot = { workspace = true, default-features = false } -xutex = { workspace = true, default-features = false } +parking_lot.workspace = true +xutex.workspace = true [dev-dependencies] -futures-util = { workspace = true, default-features = false, features = ["alloc", "std"] } +futures-util = { workspace = true, features = ["alloc", "std"] } tokio = { workspace = true, features = ["macros", "rt", "time", "rt-multi-thread"] } [lints] From 76e0edea2483351e4821800b852c6a56e5a06cd7 Mon Sep 17 00:00:00 2001 From: schgoo <138131263+schgoo@users.noreply.github.com> Date: Thu, 11 Dec 2025 16:28:02 -0500 Subject: [PATCH 08/13] Update crates/uniflight/Cargo.toml Co-authored-by: Martin Taillefer --- crates/uniflight/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/uniflight/Cargo.toml b/crates/uniflight/Cargo.toml index f62a9e1a..695b76bd 100644 --- a/crates/uniflight/Cargo.toml +++ b/crates/uniflight/Cargo.toml @@ -6,8 +6,8 @@ name = "uniflight" description = "Coalesces multiple ongoing tasks into a leader which does the work, and follower tasks that wait on the result, to prevent duplicate I/O or other downstream overhead." version = "0.1.0" readme = "README.md" -keywords = ["oxidizer", "singleflight"] -categories = ["Concurrency"] +keywords = ["oxidizer", "coalescing", "stempede", "singleflight", "deduplication"] +categories = ["concurrency"] edition.workspace = true rust-version.workspace = true From e6e7458503a58d3ee5b75a537829ba6d2dd2a7bd Mon Sep 17 00:00:00 2001 From: Schuyler Goodman Date: Tue, 16 Dec 2025 16:39:07 -0500 Subject: [PATCH 09/13] Move tests to separate file --- crates/uniflight/src/lib.rs | 205 -------------------------------- crates/uniflight/tests/work.rs | 208 +++++++++++++++++++++++++++++++++ 2 files changed, 208 insertions(+), 205 deletions(-) create mode 100644 crates/uniflight/tests/work.rs diff --git a/crates/uniflight/src/lib.rs b/crates/uniflight/src/lib.rs index ff049afd..f5167415 100644 --- a/crates/uniflight/src/lib.rs +++ b/crates/uniflight/src/lib.rs @@ -221,208 +221,3 @@ where } } } - -#[cfg(test)] -mod tests { - use std::{ - sync::atomic::{ - AtomicUsize, - Ordering::{AcqRel, Acquire}, - }, - time::Duration, - }; - - use futures_util::{StreamExt, stream::FuturesUnordered}; - - use super::*; - - fn unreachable_future() -> std::future::Pending { - std::future::pending() - } - - #[tokio::test] - async fn direct_call() { - let group = UniFlight::new(); - let result = group - .work("key", || async { - tokio::time::sleep(Duration::from_millis(10)).await; - "Result".to_string() - }) - .await; - assert_eq!(result, "Result"); - } - - #[tokio::test] - async fn parallel_call() { - let call_counter = AtomicUsize::default(); - - let group = UniFlight::new(); - let futures = FuturesUnordered::new(); - for _ in 0..10 { - futures.push(group.work("key", || async { - tokio::time::sleep(Duration::from_millis(100)).await; - call_counter.fetch_add(1, AcqRel); - "Result".to_string() - })); - } - - assert!(futures.all(|out| async move { out == "Result" }).await); - assert_eq!(call_counter.load(Acquire), 1); - } - - #[tokio::test] - async fn parallel_call_seq_await() { - let call_counter = AtomicUsize::default(); - - let group = UniFlight::new(); - let mut futures = Vec::new(); - for _ in 0..10 { - futures.push(group.work("key", || async { - tokio::time::sleep(Duration::from_millis(100)).await; - call_counter.fetch_add(1, AcqRel); - "Result".to_string() - })); - } - - for fut in futures { - assert_eq!(fut.await, "Result"); - } - assert_eq!(call_counter.load(Acquire), 1); - } - - #[tokio::test] - async fn call_with_static_str_key() { - let group = UniFlight::new(); - let result = group - .work("key".to_string(), || async { - tokio::time::sleep(Duration::from_millis(1)).await; - "Result".to_string() - }) - .await; - assert_eq!(result, "Result"); - } - - #[tokio::test] - async fn call_with_static_string_key() { - let group = UniFlight::new(); - let result = group - .work("key".to_string(), || async { - tokio::time::sleep(Duration::from_millis(1)).await; - "Result".to_string() - }) - .await; - assert_eq!(result, "Result"); - } - - #[tokio::test] - async fn call_with_custom_key() { - #[derive(Clone, PartialEq, Eq, Hash)] - struct K(i32); - let group = UniFlight::new(); - let result = group - .work(K(1), || async { - tokio::time::sleep(Duration::from_millis(1)).await; - "Result".to_string() - }) - .await; - assert_eq!(result, "Result"); - } - - #[tokio::test] - async fn late_wait() { - let group = UniFlight::new(); - let fut_early = group.work("key".to_string(), || async { - tokio::time::sleep(Duration::from_millis(20)).await; - "Result".to_string() - }); - let fut_late = group.work("key".into(), unreachable_future); - assert_eq!(fut_early.await, "Result"); - tokio::time::sleep(Duration::from_millis(50)).await; - assert_eq!(fut_late.await, "Result"); - } - - #[tokio::test] - async fn cancel() { - let group = UniFlight::new(); - - // the executer cancelled and the other awaiter will create a new future and execute. - let fut_cancel = group.work("key".to_string(), unreachable_future); - let _ = tokio::time::timeout(Duration::from_millis(10), fut_cancel).await; - let fut_late = group.work("key".to_string(), || async { "Result2".to_string() }); - assert_eq!(fut_late.await, "Result2"); - - // the first executer is slow but not dropped, so the result will be the first ones. - let begin = tokio::time::Instant::now(); - let fut_1 = group.work("key".to_string(), || async { - tokio::time::sleep(Duration::from_millis(2000)).await; - "Result1".to_string() - }); - let fut_2 = group.work("key".to_string(), unreachable_future); - let (v1, v2) = tokio::join!(fut_1, fut_2); - assert_eq!(v1, "Result1"); - assert_eq!(v2, "Result1"); - assert!(begin.elapsed() > Duration::from_millis(1500)); - } - - #[tokio::test] - async fn leader_panic_in_spawned_task() { - let call_counter = AtomicUsize::default(); - let group: Arc> = Arc::new(UniFlight::new()); - - // First task will panic in a spawned task (no catch_unwind) - let group_clone = Arc::clone(&group); - let handle = tokio::spawn(async move { - group_clone - .work("key".to_string(), || async { - tokio::time::sleep(Duration::from_millis(50)).await; - panic!("leader panicked in spawned task"); - #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] - "never".to_string() - }) - .await - }); - - // Give time for the spawned task to register and start - tokio::time::sleep(Duration::from_millis(10)).await; - - // Second task should become the new leader after the first panics - let group_clone = Arc::clone(&group); - let call_counter_ref = &call_counter; - let fut_follower = group_clone.work("key".to_string(), || async { - call_counter_ref.fetch_add(1, AcqRel); - "Result".to_string() - }); - - // Wait for the spawned task to panic - let spawn_result = handle.await; - assert!(spawn_result.is_err()); - - // The follower should succeed - Rust's drop semantics ensure the mutex is released - let result = fut_follower.await; - assert_eq!(result, "Result"); - assert_eq!(call_counter.load(Acquire), 1); - } - - #[tokio::test] - async fn debug_impl() { - let group: UniFlight = UniFlight::new(); - - // Test Debug on empty group - let debug_str = format!("{:?}", group); - assert!(debug_str.contains("UniFlight")); - - // Create a pending work item to populate the mapping with a BroadcastOnce - let fut = group.work("key".to_string(), || async { - tokio::time::sleep(Duration::from_millis(100)).await; - "Result".to_string() - }); - - // Debug should still work with entries in the mapping - let debug_str = format!("{:?}", group); - assert!(debug_str.contains("UniFlight")); - assert!(debug_str.contains("BroadcastOnce")); - - // Complete the work - assert_eq!(fut.await, "Result"); - } -} diff --git a/crates/uniflight/tests/work.rs b/crates/uniflight/tests/work.rs new file mode 100644 index 00000000..c552862b --- /dev/null +++ b/crates/uniflight/tests/work.rs @@ -0,0 +1,208 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Integration tests for `UniFlight::work()`. + +use std::{ + sync::{ + Arc, + atomic::{ + AtomicUsize, + Ordering::{AcqRel, Acquire}, + }, + }, + time::Duration, +}; + +use futures_util::{StreamExt, stream::FuturesUnordered}; +use uniflight::UniFlight; + +fn unreachable_future() -> std::future::Pending { + std::future::pending() +} + +#[tokio::test] +async fn direct_call() { + let group = UniFlight::new(); + let result = group + .work("key", || async { + tokio::time::sleep(Duration::from_millis(10)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); +} + +#[tokio::test] +async fn parallel_call() { + let call_counter = AtomicUsize::default(); + + let group = UniFlight::new(); + let futures = FuturesUnordered::new(); + for _ in 0..10 { + futures.push(group.work("key", || async { + tokio::time::sleep(Duration::from_millis(100)).await; + call_counter.fetch_add(1, AcqRel); + "Result".to_string() + })); + } + + assert!(futures.all(|out| async move { out == "Result" }).await); + assert_eq!(call_counter.load(Acquire), 1); +} + +#[tokio::test] +async fn parallel_call_seq_await() { + let call_counter = AtomicUsize::default(); + + let group = UniFlight::new(); + let mut futures = Vec::new(); + for _ in 0..10 { + futures.push(group.work("key", || async { + tokio::time::sleep(Duration::from_millis(100)).await; + call_counter.fetch_add(1, AcqRel); + "Result".to_string() + })); + } + + for fut in futures { + assert_eq!(fut.await, "Result"); + } + assert_eq!(call_counter.load(Acquire), 1); +} + +#[tokio::test] +async fn call_with_static_str_key() { + let group = UniFlight::new(); + let result = group + .work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(1)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); +} + +#[tokio::test] +async fn call_with_static_string_key() { + let group = UniFlight::new(); + let result = group + .work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(1)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); +} + +#[tokio::test] +async fn call_with_custom_key() { + #[derive(Clone, PartialEq, Eq, Hash)] + struct K(i32); + let group = UniFlight::new(); + let result = group + .work(K(1), || async { + tokio::time::sleep(Duration::from_millis(1)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); +} + +#[tokio::test] +async fn late_wait() { + let group = UniFlight::new(); + let fut_early = group.work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(20)).await; + "Result".to_string() + }); + let fut_late = group.work("key".into(), unreachable_future); + assert_eq!(fut_early.await, "Result"); + tokio::time::sleep(Duration::from_millis(50)).await; + assert_eq!(fut_late.await, "Result"); +} + +#[tokio::test] +async fn cancel() { + let group = UniFlight::new(); + + // the executer cancelled and the other awaiter will create a new future and execute. + let fut_cancel = group.work("key".to_string(), unreachable_future); + let _ = tokio::time::timeout(Duration::from_millis(10), fut_cancel).await; + let fut_late = group.work("key".to_string(), || async { "Result2".to_string() }); + assert_eq!(fut_late.await, "Result2"); + + // the first executer is slow but not dropped, so the result will be the first ones. + let begin = tokio::time::Instant::now(); + let fut_1 = group.work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(2000)).await; + "Result1".to_string() + }); + let fut_2 = group.work("key".to_string(), unreachable_future); + let (v1, v2) = tokio::join!(fut_1, fut_2); + assert_eq!(v1, "Result1"); + assert_eq!(v2, "Result1"); + assert!(begin.elapsed() > Duration::from_millis(1500)); +} + +#[tokio::test] +async fn leader_panic_in_spawned_task() { + let call_counter = AtomicUsize::default(); + let group: Arc> = Arc::new(UniFlight::new()); + + // First task will panic in a spawned task (no catch_unwind) + let group_clone = Arc::clone(&group); + let handle = tokio::spawn(async move { + group_clone + .work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(50)).await; + panic!("leader panicked in spawned task"); + #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] + "never".to_string() + }) + .await + }); + + // Give time for the spawned task to register and start + tokio::time::sleep(Duration::from_millis(10)).await; + + // Second task should become the new leader after the first panics + let group_clone = Arc::clone(&group); + let call_counter_ref = &call_counter; + let fut_follower = group_clone.work("key".to_string(), || async { + call_counter_ref.fetch_add(1, AcqRel); + "Result".to_string() + }); + + // Wait for the spawned task to panic + let spawn_result = handle.await; + assert!(spawn_result.is_err()); + + // The follower should succeed - Rust's drop semantics ensure the mutex is released + let result = fut_follower.await; + assert_eq!(result, "Result"); + assert_eq!(call_counter.load(Acquire), 1); +} + +#[tokio::test] +async fn debug_impl() { + let group: UniFlight = UniFlight::new(); + + // Test Debug on empty group + let debug_str = format!("{:?}", group); + assert!(debug_str.contains("UniFlight")); + + // Create a pending work item to populate the mapping with a BroadcastOnce + let fut = group.work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(100)).await; + "Result".to_string() + }); + + // Debug should still work with entries in the mapping + let debug_str = format!("{:?}", group); + assert!(debug_str.contains("UniFlight")); + assert!(debug_str.contains("BroadcastOnce")); + + // Complete the work + assert_eq!(fut.await, "Result"); +} From bf25a1a62165aa2eb247e0b542e5764583ec3aa8 Mon Sep 17 00:00:00 2001 From: Schuyler Goodman Date: Tue, 16 Dec 2025 17:07:47 -0500 Subject: [PATCH 10/13] Use tick --- Cargo.lock | 1 + crates/uniflight/Cargo.toml | 11 ++++++++++- crates/uniflight/examples/cache_population.rs | 6 ++++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4173727..de0be1e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1725,6 +1725,7 @@ version = "0.1.0" dependencies = [ "futures-util", "parking_lot", + "tick", "tokio", "xutex", ] diff --git a/crates/uniflight/Cargo.toml b/crates/uniflight/Cargo.toml index f62a9e1a..b2d431ba 100644 --- a/crates/uniflight/Cargo.toml +++ b/crates/uniflight/Cargo.toml @@ -22,7 +22,16 @@ xutex.workspace = true [dev-dependencies] futures-util = { workspace = true, features = ["alloc", "std"] } -tokio = { workspace = true, features = ["macros", "rt", "time", "rt-multi-thread"] } +tick = { workspace = true, features = ["tokio"] } +tokio = { workspace = true, features = [ + "macros", + "rt", + "time", + "rt-multi-thread", +] } [lints] workspace = true + +[[example]] +name = "cache_population" diff --git a/crates/uniflight/examples/cache_population.rs b/crates/uniflight/examples/cache_population.rs index ab570f0f..a8dfa4d0 100644 --- a/crates/uniflight/examples/cache_population.rs +++ b/crates/uniflight/examples/cache_population.rs @@ -15,6 +15,7 @@ use std::{ time::Duration, }; +use tick::Clock; use uniflight::UniFlight; #[tokio::main] @@ -33,7 +34,8 @@ async fn main() { let group = Arc::clone(&cache_group); let counter = Arc::clone(&execution_count); let handle = tokio::spawn(async move { - let start = tokio::time::Instant::now(); + let clock = Clock::new_tokio(); + let start = clock.instant(); let result = group .work("user:123".to_string(), || async { @@ -41,7 +43,7 @@ async fn main() { println!(" [Request {i}] I'm the leader! Fetching from database... (execution #{count})"); // Simulate expensive database query - tokio::time::sleep(Duration::from_millis(500)).await; + clock.delay(Duration::from_millis(500)).await; "UserData(name: Alice, age: 30)".to_string() }) From e9697faef87153dc4703564692f6f95f238be598 Mon Sep 17 00:00:00 2001 From: Schuyler Goodman Date: Tue, 16 Dec 2025 18:46:51 -0500 Subject: [PATCH 11/13] Add N leaders functionality (for redundancy) --- crates/uniflight/README.md | 25 ++++ crates/uniflight/src/lib.rs | 234 +++++++++++++++++++++++++++++---- crates/uniflight/tests/work.rs | 88 +++++++++++++ 3 files changed, 321 insertions(+), 26 deletions(-) diff --git a/crates/uniflight/README.md b/crates/uniflight/README.md index cce07f50..b1575891 100644 --- a/crates/uniflight/README.md +++ b/crates/uniflight/README.md @@ -62,6 +62,31 @@ let result = group.work("user:123", || async { [`UniFlight`] is `Send` and `Sync`, and can be shared across threads. The returned futures do not require `Send` bounds on the closure or its output. +## Multiple Leaders for Redundancy + +By default, `UniFlight` uses a single leader per key. For redundancy scenarios where you want +multiple concurrent attempts at the same operation (using whichever completes first), use +[`UniFlight::with_max_leaders`]: + +```rust +use uniflight::UniFlight; + +// Allow up to 3 concurrent leaders for redundancy +let group: UniFlight<&str, String> = UniFlight::with_max_leaders(3); + +// First 3 concurrent calls become leaders and execute in parallel. +// The first leader to complete stores the result. +// All callers (leaders and followers) receive that result. +let result = group.work("key", || async { + "result".to_string() +}).await; +``` + +This is useful when: +- You want fault tolerance through redundant execution +- Network latency varies and you want the fastest response +- You're implementing speculative execution patterns +

diff --git a/crates/uniflight/src/lib.rs b/crates/uniflight/src/lib.rs index f5167415..b30a4be3 100644 --- a/crates/uniflight/src/lib.rs +++ b/crates/uniflight/src/lib.rs @@ -50,6 +50,33 @@ //! //! [`UniFlight`] is `Send` and `Sync`, and can be shared across threads. The returned futures //! do not require `Send` bounds on the closure or its output. +//! +//! # Multiple Leaders for Redundancy +//! +//! By default, `UniFlight` uses a single leader per key. For redundancy scenarios where you want +//! multiple concurrent attempts at the same operation (using whichever completes first), use +//! [`UniFlight::with_max_leaders`]: +//! +//! ``` +//! use uniflight::UniFlight; +//! +//! # async fn example() { +//! // Allow up to 3 concurrent leaders for redundancy +//! let group: UniFlight<&str, String> = UniFlight::with_max_leaders(3); +//! +//! // First 3 concurrent calls become leaders and execute in parallel. +//! // The first leader to complete stores the result. +//! // All callers (leaders and followers) receive that result. +//! let result = group.work("key", || async { +//! "result".to_string() +//! }).await; +//! # } +//! ``` +//! +//! This is useful when: +//! - You want fault tolerance through redundant execution +//! - Network latency varies and you want the fastest response +//! - You're implementing speculative execution patterns #![doc(html_logo_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/uniflight/logo.png")] #![doc(html_favicon_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/uniflight/favicon.ico")] @@ -57,7 +84,10 @@ use std::{ collections::HashMap, hash::Hash, - sync::{Arc, Weak}, + sync::{ + Arc, Weak, + atomic::{AtomicUsize, Ordering}, + }, }; use parking_lot::Mutex as SyncMutex; @@ -70,22 +100,66 @@ type SharedMapping = Arc>>>; #[derive(Debug)] pub struct UniFlight { mapping: SharedMapping, + max_leaders: usize, } impl Default for UniFlight { fn default() -> Self { - Self { mapping: Arc::default() } + Self { + mapping: Arc::default(), + max_leaders: 1, + } } } struct Shared { slot: AsyncMutex>, + leader_count: AtomicUsize, + max_leaders: usize, } -impl Default for Shared { - fn default() -> Self { +impl Shared { + fn new(max_leaders: usize) -> Self { Self { slot: AsyncMutex::new(None), + leader_count: AtomicUsize::new(0), + max_leaders, + } + } +} + +/// RAII guard that decrements leader count on drop. +struct LeaderGuard { + shared: Option>>, +} + +impl LeaderGuard { + /// Try to claim a leader slot. Returns `Some(guard)` if successful, `None` if max leaders reached. + fn try_claim(shared: &Arc>) -> Option { + let current = shared.leader_count.load(Ordering::Acquire); + if current < shared.max_leaders { + let prev = shared.leader_count.fetch_add(1, Ordering::AcqRel); + if prev < shared.max_leaders { + return Some(Self { + shared: Some(Arc::clone(shared)), + }); + } + // Race lost - another caller claimed the last slot + shared.leader_count.fetch_sub(1, Ordering::AcqRel); + } + None + } + + /// Consume the guard without decrementing (called when leader successfully stores result). + fn disarm(mut self) -> Arc> { + self.shared.take().expect("LeaderGuard shared already taken") + } +} + +impl Drop for LeaderGuard { + fn drop(&mut self) { + if let Some(shared) = &self.shared { + shared.leader_count.fetch_sub(1, Ordering::AcqRel); } } } @@ -97,8 +171,8 @@ struct BroadcastOnce { } impl BroadcastOnce { - fn new() -> (Self, Arc>) { - let shared = Arc::new(Shared::default()); + fn new(max_leaders: usize) -> (Self, Arc>) { + let shared = Arc::new(Shared::new(max_leaders)); ( Self { shared: Arc::downgrade(&shared), @@ -108,10 +182,18 @@ impl BroadcastOnce { } } +/// Role of a caller in the work execution. +enum Role { + /// Leader executes the work closure. + Leader { func: F, guard: LeaderGuard }, + /// Follower waits for any leader's result. Keeps func for potential promotion. + Follower { func: F }, +} + // After calling BroadcastOnce::waiter we can get a waiter. // It's in WaitList. struct BroadcastOnceWaiter { - func: F, + role: Role, shared: Arc>, key: K, @@ -129,27 +211,45 @@ impl std::fmt::Debug for BroadcastOnce { reason = "The Result type is complex but intentionally groups related items for the retry pattern" )] impl BroadcastOnce { + /// Attempts to create a waiter for an existing broadcast. + /// + /// Returns `Ok` with a waiter (either leader or follower role) if the broadcast is still active. + /// Returns `Err` if all leaders have dropped (weak reference upgrade failed). fn try_waiter( &self, func: F, key: K, mapping: SharedMapping, ) -> Result, (F, K, SharedMapping)> { - let Some(upgraded) = self.shared.upgrade() else { + let Some(shared) = self.shared.upgrade() else { return Err((func, key, mapping)); }; + + // Try to become a leader if slots are available + if let Some(guard) = LeaderGuard::try_claim(&shared) { + return Ok(BroadcastOnceWaiter { + role: Role::Leader { func, guard }, + shared, + key, + mapping, + }); + } + + // Become a follower (keep func for potential promotion) Ok(BroadcastOnceWaiter { - func, - shared: upgraded, + role: Role::Follower { func }, + shared, key, mapping, }) } - #[inline] - const fn waiter(shared: Arc>, func: F, key: K, mapping: SharedMapping) -> BroadcastOnceWaiter { + /// Creates a waiter for a new broadcast entry (first caller always becomes leader). + fn leader_waiter(shared: Arc>, func: F, key: K, mapping: SharedMapping) -> BroadcastOnceWaiter { + // Safe to unwrap: new Shared starts at 0, max_leaders >= 1 + let guard = LeaderGuard::try_claim(&shared).expect("first leader claim should always succeed"); BroadcastOnceWaiter { - func, + role: Role::Leader { func, guard }, shared, key, mapping, @@ -167,34 +267,114 @@ where T: Clone, { async fn wait(self) -> T { - let mut slot = self.shared.slot.lock().await; - if let Some(value) = (*slot).as_ref() { - return value.clone(); + let Self { + role, + shared, + key, + mapping, + } = self; + match role { + Role::Leader { func, guard } => Self::wait_as_leader(shared, key, mapping, func, guard).await, + Role::Follower { func } => Self::wait_as_follower(shared, key, mapping, func).await, + } + } + + async fn wait_as_leader(shared: Arc>, key: K, mapping: SharedMapping, func: F, guard: LeaderGuard) -> T { + // Lock the slot first - this ensures followers wait while we execute + let mut slot = shared.slot.lock().await; + + // Check if another leader already stored a result + if let Some(value) = slot.as_ref() { + let result = value.clone(); + drop(slot); + guard.disarm(); + return result; } - let value = (self.func)().await; + // Execute the work while holding the lock + // This ensures followers block on lock().await until we're done + let value = func().await; *slot = Some(value.clone()); + drop(slot); - self.mapping.lock().remove(&self.key); + // Clean up the mapping entry + mapping.lock().remove(&key); + // Disarm the guard (result is stored, count doesn't matter) + guard.disarm(); value } + + async fn wait_as_follower(shared: Arc>, key: K, mapping: SharedMapping, func: F) -> T { + // Wait for a result by acquiring the slot lock + // Leaders hold this lock during execution, so we'll block until one finishes + let slot = shared.slot.lock().await; + if let Some(value) = slot.as_ref() { + return value.clone(); + } + drop(slot); + + // No result and we acquired the lock - all leaders must have failed + // Promote ourselves to leader and execute + // Safe to unwrap: if we got here, leader_count == 0, and max_leaders >= 1 + let guard = LeaderGuard::try_claim(&shared).expect("follower promotion should always succeed"); + Self::wait_as_leader(shared, key, mapping, func, guard).await + } } impl UniFlight where K: Hash + Eq + Clone, { - /// Creates a new `UniFlight` instance. + /// Creates a new `UniFlight` instance with single-leader behavior. #[inline] #[must_use] pub fn new() -> Self { Self::default() } - /// Execute and return the value for a given function, making sure that only one - /// operation is in-flight at a given moment. If a duplicate call comes in, that caller will - /// wait until the original call completes and return the same value. + /// Creates a new `UniFlight` instance allowing up to `max_leaders` concurrent executions. + /// + /// When multiple tasks request the same work concurrently, up to `max_leaders` of them + /// will execute in parallel. The first to complete wins, and all other tasks (both + /// executing leaders and waiting followers) receive that result. + /// + /// This is useful for redundancy scenarios where you want multiple attempts at the + /// same operation and want to use whichever completes first. + /// + /// # Panics + /// + /// Panics if `max_leaders` is 0. + /// + /// # Example + /// + /// ``` + /// use uniflight::UniFlight; + /// + /// # async fn example() { + /// // Allow 3 concurrent leaders for redundancy + /// let group: UniFlight<&str, String> = UniFlight::with_max_leaders(3); + /// + /// // Up to 3 concurrent calls will execute in parallel + /// let result = group.work("key", || async { + /// "result".to_string() + /// }).await; + /// # } + /// ``` + #[inline] + #[must_use] + pub fn with_max_leaders(max_leaders: usize) -> Self { + assert!(max_leaders > 0, "max_leaders must be at least 1"); + Self { + mapping: Arc::default(), + max_leaders, + } + } + + /// Execute and return the value for a given function, making sure that only up to + /// `max_leaders` operations are in-flight at a given moment. If a duplicate call comes in + /// beyond the limit, that caller will wait until one of the leaders completes and return + /// the same value. pub fn work(&self, key: K, func: F) -> impl Future where F: FnOnce() -> Fut, @@ -209,14 +389,16 @@ where Ok(waiter) => return waiter.wait(), Err(fm) => fm, }; - let (new_call, shared) = BroadcastOnce::new(); + // All leaders dropped - create new broadcast entry + let (new_call, shared) = BroadcastOnce::new(self.max_leaders); *call = new_call; - let waiter = BroadcastOnce::waiter(shared, func, key, owned_mapping); + let waiter = BroadcastOnce::leader_waiter(shared, func, key, owned_mapping); waiter.wait() } else { - let (call, shared) = BroadcastOnce::new(); + // New key - create broadcast entry and become first leader + let (call, shared) = BroadcastOnce::new(self.max_leaders); mapping.insert(key.clone(), call); - let waiter = BroadcastOnce::waiter(shared, func, key, owned_mapping); + let waiter = BroadcastOnce::leader_waiter(shared, func, key, owned_mapping); waiter.wait() } } diff --git a/crates/uniflight/tests/work.rs b/crates/uniflight/tests/work.rs index c552862b..a8e5956f 100644 --- a/crates/uniflight/tests/work.rs +++ b/crates/uniflight/tests/work.rs @@ -206,3 +206,91 @@ async fn debug_impl() { // Complete the work assert_eq!(fut.await, "Result"); } + +// N-leader tests + +#[tokio::test] +async fn with_max_leaders_basic() { + let group: UniFlight<&str, String> = UniFlight::with_max_leaders(3); + let result = group + .work("key", || async { + tokio::time::sleep(Duration::from_millis(10)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); +} + +#[tokio::test] +async fn multiple_leaders_all_get_same_result() { + let call_counter = AtomicUsize::default(); + + // Allow up to 3 concurrent leaders + let group = UniFlight::with_max_leaders(3); + let futures = FuturesUnordered::new(); + + // Start 5 concurrent calls - up to 3 become leaders, 2 become followers + for i in 0..5 { + let counter = &call_counter; + futures.push(group.work("key", move || async move { + tokio::time::sleep(Duration::from_millis(50)).await; + counter.fetch_add(1, AcqRel); + format!("Result-{i}") + })); + } + + // All should complete with the same result (first to finish wins) + let results: Vec<_> = futures.collect().await; + let first_result = &results[0]; + assert!(results.iter().all(|r| r == first_result)); +} + +#[tokio::test] +async fn followers_get_first_leader_result() { + let group = UniFlight::with_max_leaders(2); + + // Start first leader (slow) + let fut1 = group.work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(100)).await; + "slow".to_string() + }); + + // Start second leader (fast) + let fut2 = group.work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(10)).await; + "fast".to_string() + }); + + // Start followers (should get whichever leader finishes first) + let fut3 = group.work("key".to_string(), unreachable_future); + let fut4 = group.work("key".to_string(), unreachable_future); + + // Note: Due to current implementation, leaders serialize on slot lock, + // so execution order is deterministic. The first to acquire the lock wins. + let (r1, r2, r3, r4) = tokio::join!(fut1, fut2, fut3, fut4); + + // All should have the same result + assert_eq!(r1, r2); + assert_eq!(r2, r3); + assert_eq!(r3, r4); +} + +#[tokio::test] +async fn leader_cancel_with_multiple_leaders() { + let group: Arc> = Arc::new(UniFlight::with_max_leaders(2)); + + // First leader will be cancelled + let group_clone = Arc::clone(&group); + let fut_cancel = group_clone.work("key".to_string(), unreachable_future); + let _ = tokio::time::timeout(Duration::from_millis(10), fut_cancel).await; + + // Second leader should succeed + let result = group.work("key".to_string(), || async { "Success".to_string() }).await; + assert_eq!(result, "Success"); +} + +#[tokio::test] +#[should_panic(expected = "max_leaders must be at least 1")] +async fn with_max_leaders_zero_panics() { + let _group: UniFlight<&str, String> = UniFlight::with_max_leaders(0); +} From af7f96c5e4017012ca56477794e28e849e8b2e57 Mon Sep 17 00:00:00 2001 From: Schuyler Goodman Date: Wed, 17 Dec 2025 10:19:22 -0500 Subject: [PATCH 12/13] Refactoring to allow followers to unlock in parallel --- Cargo.lock | 50 +++++++++------------ Cargo.toml | 1 + crates/uniflight/Cargo.toml | 2 +- crates/uniflight/src/lib.rs | 87 +++++++++++++++++++++++-------------- 4 files changed, 76 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de0be1e2..4a43b44d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,15 +66,6 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" -[[package]] -name = "branches" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f11502672c5570f77f6bdf573332483f8475bab6a7fda00f1fae8ddb5a6245c0" -dependencies = [ - "rustc_version", -] - [[package]] name = "bumpalo" version = "3.19.0" @@ -206,6 +197,15 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "convert_case" version = "0.10.0" @@ -263,15 +263,6 @@ dependencies = [ "itertools 0.13.0", ] -[[package]] -name = "crossbeam-queue" -version = "0.3.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -386,6 +377,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "pin-project-lite", +] + [[package]] name = "find-msvc-tools" version = "0.1.5" @@ -1723,11 +1724,11 @@ checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" name = "uniflight" version = "0.1.0" dependencies = [ + "event-listener", "futures-util", "parking_lot", "tick", "tokio", - "xutex", ] [[package]] @@ -2057,17 +2058,6 @@ version = "0.8.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ae8337f8a065cfc972643663ea4279e04e7256de865aa66fe25cec5fb912d3f" -[[package]] -name = "xutex" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c6a2a824bda0270095d584b553e1f084652e5fc40ebf3945e78a14a2437e0c6" -dependencies = [ - "branches", - "crossbeam-queue", - "once_cell", -] - [[package]] name = "xxhash-rust" version = "0.8.15" diff --git a/Cargo.toml b/Cargo.toml index c32633c7..75eca013 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ chrono-tz = { version = "0.10.4", default-features = false } criterion = { version = "0.7.0", default-features = false } derive_more = { version = "2.0.1", default-features = false } duct = { version = "1.1.1", default-features = false } +event-listener = { version = "5.4.0", default-features = false } futures = { version = "0.3.31", default-features = false } futures-core = { version = "0.3.31", default-features = false } futures-util = { version = "0.3.31", default-features = false } diff --git a/crates/uniflight/Cargo.toml b/crates/uniflight/Cargo.toml index 74e0c9ea..f395f9bf 100644 --- a/crates/uniflight/Cargo.toml +++ b/crates/uniflight/Cargo.toml @@ -17,8 +17,8 @@ homepage.workspace = true repository.workspace = true [dependencies] +event-listener.workspace = true parking_lot.workspace = true -xutex.workspace = true [dev-dependencies] futures-util = { workspace = true, features = ["alloc", "std"] } diff --git a/crates/uniflight/src/lib.rs b/crates/uniflight/src/lib.rs index b30a4be3..58abbfbb 100644 --- a/crates/uniflight/src/lib.rs +++ b/crates/uniflight/src/lib.rs @@ -85,13 +85,13 @@ use std::{ collections::HashMap, hash::Hash, sync::{ - Arc, Weak, + Arc, OnceLock, Weak, atomic::{AtomicUsize, Ordering}, }, }; +use event_listener::Event; use parking_lot::Mutex as SyncMutex; -use xutex::AsyncMutex; type SharedMapping = Arc>>>; @@ -113,15 +113,21 @@ impl Default for UniFlight { } struct Shared { - slot: AsyncMutex>, + /// Result storage - written once by the winning leader, then lock-free reads. + result: OnceLock, + /// Event for notifying waiters when result is ready or all leaders failed. + ready: Event, + /// Number of leaders currently executing. leader_count: AtomicUsize, + /// Maximum concurrent leaders. max_leaders: usize, } impl Shared { fn new(max_leaders: usize) -> Self { Self { - slot: AsyncMutex::new(None), + result: OnceLock::new(), + ready: Event::new(), leader_count: AtomicUsize::new(0), max_leaders, } @@ -159,7 +165,11 @@ impl LeaderGuard { impl Drop for LeaderGuard { fn drop(&mut self) { if let Some(shared) = &self.shared { - shared.leader_count.fetch_sub(1, Ordering::AcqRel); + let prev = shared.leader_count.fetch_sub(1, Ordering::AcqRel); + // If we were the last leader and no result was stored, wake one follower for promotion. + if prev == 1 && shared.result.get().is_none() { + shared.ready.notify(1); + } } } } @@ -280,45 +290,56 @@ where } async fn wait_as_leader(shared: Arc>, key: K, mapping: SharedMapping, func: F, guard: LeaderGuard) -> T { - // Lock the slot first - this ensures followers wait while we execute - let mut slot = shared.slot.lock().await; - - // Check if another leader already stored a result - if let Some(value) = slot.as_ref() { - let result = value.clone(); - drop(slot); + // Check if another leader already stored a result (lock-free read). + if let Some(result) = shared.result.get() { guard.disarm(); - return result; + return result.clone(); } - // Execute the work while holding the lock - // This ensures followers block on lock().await until we're done + // Execute the work. let value = func().await; - *slot = Some(value.clone()); - drop(slot); - // Clean up the mapping entry - mapping.lock().remove(&key); + // Try to store the result. First writer wins via OnceLock. + if shared.result.set(value.clone()).is_ok() { + // We stored the result - clean up the mapping entry. + mapping.lock().remove(&key); + } - // Disarm the guard (result is stored, count doesn't matter) + // Notify ALL waiting followers simultaneously. + shared.ready.notify(usize::MAX); + + // Disarm the guard (result is stored, count doesn't matter). guard.disarm(); - value + + // Return our computed value, or the winning value if we lost the race. + shared.result.get().cloned().unwrap_or(value) } async fn wait_as_follower(shared: Arc>, key: K, mapping: SharedMapping, func: F) -> T { - // Wait for a result by acquiring the slot lock - // Leaders hold this lock during execution, so we'll block until one finishes - let slot = shared.slot.lock().await; - if let Some(value) = slot.as_ref() { - return value.clone(); - } - drop(slot); + loop { + // Fast path: result already available (lock-free read). + if let Some(result) = shared.result.get() { + return result.clone(); + } + + // Register listener BEFORE checking state to avoid missed notifications. + let listener = shared.ready.listen(); + + // Double-check after registering. + if let Some(result) = shared.result.get() { + return result.clone(); + } - // No result and we acquired the lock - all leaders must have failed - // Promote ourselves to leader and execute - // Safe to unwrap: if we got here, leader_count == 0, and max_leaders >= 1 - let guard = LeaderGuard::try_claim(&shared).expect("follower promotion should always succeed"); - Self::wait_as_leader(shared, key, mapping, func, guard).await + // Check if all leaders have failed and we need promotion. + if shared.leader_count.load(Ordering::Acquire) == 0 { + // All leaders failed - promote ourselves. + let guard = LeaderGuard::try_claim(&shared).expect("follower promotion should always succeed"); + return Self::wait_as_leader(shared, key, mapping, func, guard).await; + } + + // Wait for notification (in parallel with other followers). + listener.await; + } } } From 9445788f0d5df361c554497c2bc1159237446c88 Mon Sep 17 00:00:00 2001 From: Schuyler Goodman Date: Wed, 7 Jan 2026 14:04:32 -0500 Subject: [PATCH 13/13] Update to use async-once and dashmap. Update benchmarks --- CHANGELOG.md | 1 + Cargo.lock | 65 ++- Cargo.toml | 3 + README.md | 6 +- crates/uniflight/Cargo.toml | 10 +- crates/uniflight/README.md | 43 +- crates/uniflight/benches/comparison.rs | 269 +++++++++++ crates/uniflight/examples/cache_population.rs | 4 +- crates/uniflight/src/lib.rs | 419 ++++-------------- crates/uniflight/tests/work.rs | 117 +---- 10 files changed, 438 insertions(+), 499 deletions(-) create mode 100644 crates/uniflight/benches/comparison.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index e19af40b..de44ad50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ Please see each crate's change log below: +- [`async_once`](./crates/async_once/CHANGELOG.md) - [`bytesbuf`](./crates/bytesbuf/CHANGELOG.md) - [`data_privacy`](./crates/data_privacy/CHANGELOG.md) - [`data_privacy_macros`](./crates/data_privacy_macros/CHANGELOG.md) diff --git a/Cargo.lock b/Cargo.lock index 4a43b44d..9f9e91f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,6 +44,12 @@ version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +[[package]] +name = "async-once-cell" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288f83726785267c6f2ef073a3d83dc3f9b81464e9f99898240cced85fce35a" + [[package]] name = "autocfg" version = "1.5.0" @@ -197,15 +203,6 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" -[[package]] -name = "concurrent-queue" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "convert_case" version = "0.10.0" @@ -250,6 +247,7 @@ dependencies = [ "serde", "serde_json", "tinytemplate", + "tokio", "walkdir", ] @@ -275,6 +273,20 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data_privacy" version = "0.9.0" @@ -377,16 +389,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" -[[package]] -name = "event-listener" -version = "5.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" -dependencies = [ - "concurrent-queue", - "pin-project-lite", -] - [[package]] name = "find-msvc-tools" version = "0.1.5" @@ -566,6 +568,12 @@ dependencies = [ "byteorder", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.16.1" @@ -613,7 +621,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.16.1", ] [[package]] @@ -1355,6 +1363,16 @@ version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" +[[package]] +name = "singleflight-async" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffae0d841b8012a86beec66a3f9c57b7b331a10366c764cd40bd6faebe3ad77c" +dependencies = [ + "parking_lot", + "tokio", +] + [[package]] name = "siphasher" version = "1.0.1" @@ -1597,6 +1615,7 @@ version = "1.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" dependencies = [ + "parking_lot", "pin-project-lite", "tokio-macros", ] @@ -1724,9 +1743,11 @@ checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" name = "uniflight" version = "0.1.0" dependencies = [ - "event-listener", + "async-once-cell", + "criterion", + "dashmap", "futures-util", - "parking_lot", + "singleflight-async", "tick", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 75eca013..44eeb7a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,11 +42,13 @@ uniflight = { path = "crates/uniflight", default-features = false, version = "0. # external dependencies alloc_tracker = { version = "0.5.9", default-features = false } +async-once-cell = { version = "0.5", default-features = false } anyhow = { version = "1.0.100", default-features = false } bytes = { version = "1.11.0", default-features = false } chrono = { version = "0.4.40", default-features = false } chrono-tz = { version = "0.10.4", default-features = false } criterion = { version = "0.7.0", default-features = false } +dashmap = { version = "6.1", default-features = false } derive_more = { version = "2.0.1", default-features = false } duct = { version = "1.1.1", default-features = false } event-listener = { version = "5.4.0", default-features = false } @@ -75,6 +77,7 @@ regex = { version = "1.12.2", default-features = false } serde = { version = "1.0.228", default-features = false } serde_core = { version = "1.0.224", default-features = false } serde_json = { version = "1.0.145", default-features = false } +singleflight-async = { version = "0.2", default-features = false } smallvec = { version = "1.15.1", default-features = false } static_assertions = { version = "1.1.0", default-features = false } syn = { version = "2.0.111", default-features = false } diff --git a/README.md b/README.md index f58b939a..2177c3b8 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ This repository contains a set of crates that help you build robust highly scala These are the crates built out of this repo: +- [`async_once`](./crates/async_once/README.md) - Async once-cell that executes initialization at most once with parallel follower notification. - [`bytesbuf`](./crates/bytesbuf/README.md) - Manipulate sequences of bytes for efficient I/O. - [`data_privacy`](./crates/data_privacy/README.md) - Mechanisms to classify, manipulate, and redact sensitive data. - [`data_privacy_macros`](./crates/data_privacy_macros/README.md) - Macros for the `data_privacy` crate. @@ -37,11 +38,8 @@ These are the crates built out of this repo: - [`thread_aware`](./crates/thread_aware/README.md) - Facilities to support thread-isolated state. - [`thread_aware_macros`](./crates/thread_aware_macros/README.md) - Macros for the `thread_aware` crate. - [`thread_aware_macros_impl`](./crates/thread_aware_macros_impl/README.md) - Macros for the `thread_aware` crate. -<<<<<<< HEAD -- [`uniflight`](./crates/uniflight/README.md) - Coalesces duplicate async tasks into a single execution. -======= - [`tick`](./crates/tick/README.md) - Provides primitives to interact with and manipulate machine time. ->>>>>>> 9dbba9df9e954e1b7f14be110feb4ab25da62e86 +- [`uniflight`](./crates/uniflight/README.md) - Coalesces duplicate async tasks into a single execution. ## About this Repo diff --git a/crates/uniflight/Cargo.toml b/crates/uniflight/Cargo.toml index f395f9bf..54e44f0c 100644 --- a/crates/uniflight/Cargo.toml +++ b/crates/uniflight/Cargo.toml @@ -17,11 +17,13 @@ homepage.workspace = true repository.workspace = true [dependencies] -event-listener.workspace = true -parking_lot.workspace = true +async-once-cell.workspace = true +dashmap.workspace = true [dev-dependencies] +criterion = { workspace = true, features = ["async_tokio"] } futures-util = { workspace = true, features = ["alloc", "std"] } +singleflight-async.workspace = true tick = { workspace = true, features = ["tokio"] } tokio = { workspace = true, features = [ "macros", @@ -33,5 +35,9 @@ tokio = { workspace = true, features = [ [lints] workspace = true +[[bench]] +name = "comparison" +harness = false + [[example]] name = "cache_population" diff --git a/crates/uniflight/README.md b/crates/uniflight/README.md index b1575891..09d79345 100644 --- a/crates/uniflight/README.md +++ b/crates/uniflight/README.md @@ -20,14 +20,14 @@ Coalesces duplicate async tasks into a single execution. -This crate provides [`UniFlight`], a mechanism for deduplicating concurrent async operations. +This crate provides [`Merger`], a mechanism for deduplicating concurrent async operations. When multiple tasks request the same work (identified by a key), only the first task (the "leader") performs the actual work while subsequent tasks (the "followers") wait and receive a clone of the result. ## When to Use -Use `UniFlight` when you have expensive or rate-limited operations that may be requested +Use `Merger` when you have expensive or rate-limited operations that may be requested concurrently with the same parameters: - **Cache population**: Prevent thundering herd when a cache entry expires @@ -38,9 +38,9 @@ concurrently with the same parameters: ## Example ```rust -use uniflight::UniFlight; +use uniflight::Merger; -let group: UniFlight<&str, String> = UniFlight::new(); +let group: Merger<&str, String> = Merger::new(); // Multiple concurrent calls with the same key will share a single execution let result = group.work("user:123", || async { @@ -51,7 +51,7 @@ let result = group.work("user:123", || async { ## Cancellation and Panic Safety -`UniFlight` handles task cancellation and panics gracefully: +`Merger` handles task cancellation and panics gracefully: - If the leader task is cancelled or dropped, a follower becomes the new leader - If the leader task panics, a follower becomes the new leader and executes its work @@ -59,33 +59,20 @@ let result = group.work("user:123", || async { ## Thread Safety -[`UniFlight`] is `Send` and `Sync`, and can be shared across threads. The returned futures -do not require `Send` bounds on the closure or its output. +[`Merger`] is `Send` and `Sync`, and can be shared across threads. The returned futures +are `Send` when the closure, future, key, and value types are `Send`. -## Multiple Leaders for Redundancy +## Performance -By default, `UniFlight` uses a single leader per key. For redundancy scenarios where you want -multiple concurrent attempts at the same operation (using whichever completes first), use -[`UniFlight::with_max_leaders`]: +Benchmarks comparing `uniflight` against `singleflight-async` show the following characteristics: -```rust -use uniflight::UniFlight; - -// Allow up to 3 concurrent leaders for redundancy -let group: UniFlight<&str, String> = UniFlight::with_max_leaders(3); - -// First 3 concurrent calls become leaders and execute in parallel. -// The first leader to complete stores the result. -// All callers (leaders and followers) receive that result. -let result = group.work("key", || async { - "result".to_string() -}).await; -``` +- **Concurrent workloads** (10+ tasks): uniflight is 1.2-1.3x faster, demonstrating better scalability under contention +- **Single calls**: singleflight-async has lower per-call overhead (~2x faster for individual operations) +- **Multiple keys**: uniflight performs 1.3x faster when handling multiple distinct keys concurrently -This is useful when: -- You want fault tolerance through redundant execution -- Network latency varies and you want the fastest response -- You're implementing speculative execution patterns +uniflight's DashMap-based architecture provides excellent scaling properties for high-concurrency scenarios, +making it well-suited for production workloads with concurrent access patterns. For low-contention scenarios +with predominantly single calls, the performance difference is minimal (sub-microsecond range). diff --git a/crates/uniflight/benches/comparison.rs b/crates/uniflight/benches/comparison.rs new file mode 100644 index 00000000..fd53077d --- /dev/null +++ b/crates/uniflight/benches/comparison.rs @@ -0,0 +1,269 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Benchmarks comparing uniflight against singleflight-async. + +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, +}; + +use criterion::{Criterion, criterion_group, criterion_main}; + +// Benchmark 1: Single call (no contention) +fn bench_single_call(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut group = c.benchmark_group("single_call"); + + // Use atomic counter for unique keys + static COUNTER1: AtomicU64 = AtomicU64::new(0); + + // Our implementation - pre-create the merger + let our_merger = Arc::new(uniflight::Merger::::new()); + group.bench_function("uniflight", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&our_merger); + async move { + let key = format!("key_{}", COUNTER1.fetch_add(1, Ordering::Relaxed)); + merger.work(key, || async { "value".to_string() }).await + } + }); + }); + + // singleflight-async - pre-create the group + let their_group = Arc::new(singleflight_async::SingleFlight::::new()); + group.bench_function("singleflight_async", |b| { + b.to_async(&rt).iter(|| { + let group = Arc::clone(&their_group); + async move { + let key = format!("key_{}", COUNTER1.fetch_add(1, Ordering::Relaxed)); + group.work(key, || async { "value".to_string() }).await + } + }); + }); + + group.finish(); +} + +// Benchmark 2: Concurrent calls (10 tasks, same key) +fn bench_concurrent_10(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut group = c.benchmark_group("concurrent_10_tasks"); + + // Use atomic counter for unique keys per iteration + static COUNTER2: AtomicU64 = AtomicU64::new(0); + + // Our implementation - pre-create the merger + let our_merger = Arc::new(uniflight::Merger::::new()); + group.bench_function("uniflight", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&our_merger); + async move { + let key = format!("key_{}", COUNTER2.fetch_add(1, Ordering::Relaxed)); + let handles: Vec<_> = (0..10) + .map(|_| { + let merger = Arc::clone(&merger); + let key = key.clone(); + tokio::spawn(async move { merger.work(key, || async { "value".to_string() }).await }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } + } + }); + }); + + // singleflight-async - pre-create the group + let their_group = Arc::new(singleflight_async::SingleFlight::::new()); + group.bench_function("singleflight_async", |b| { + b.to_async(&rt).iter(|| { + let group = Arc::clone(&their_group); + async move { + let key = format!("key_{}", COUNTER2.fetch_add(1, Ordering::Relaxed)); + let handles: Vec<_> = (0..10) + .map(|_| { + let group = Arc::clone(&group); + let key = key.clone(); + tokio::spawn(async move { group.work(key, || async { "value".to_string() }).await }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } + } + }); + }); + + group.finish(); +} + +// Benchmark 3: High contention (100 tasks, same key) +fn bench_concurrent_100(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut group = c.benchmark_group("concurrent_100_tasks"); + + // Use atomic counter for unique keys per iteration + static COUNTER3: AtomicU64 = AtomicU64::new(0); + + // Our implementation - pre-create the merger + let our_merger = Arc::new(uniflight::Merger::::new()); + group.bench_function("uniflight", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&our_merger); + async move { + let key = format!("key_{}", COUNTER3.fetch_add(1, Ordering::Relaxed)); + let handles: Vec<_> = (0..100) + .map(|_| { + let merger = Arc::clone(&merger); + let key = key.clone(); + tokio::spawn(async move { merger.work(key, || async { "value".to_string() }).await }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } + } + }); + }); + + // singleflight-async - pre-create the group + let their_group = Arc::new(singleflight_async::SingleFlight::::new()); + group.bench_function("singleflight_async", |b| { + b.to_async(&rt).iter(|| { + let group = Arc::clone(&their_group); + async move { + let key = format!("key_{}", COUNTER3.fetch_add(1, Ordering::Relaxed)); + let handles: Vec<_> = (0..100) + .map(|_| { + let group = Arc::clone(&group); + let key = key.clone(); + tokio::spawn(async move { group.work(key, || async { "value".to_string() }).await }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } + } + }); + }); + + group.finish(); +} + +// Benchmark 4: Multiple different keys (10 keys, 10 tasks each) +fn bench_multiple_keys(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut group = c.benchmark_group("multiple_keys_10x10"); + + // Use atomic counter for unique key prefix per iteration + static COUNTER4: AtomicU64 = AtomicU64::new(0); + + // Our implementation - pre-create the merger + let our_merger = Arc::new(uniflight::Merger::::new()); + group.bench_function("uniflight", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&our_merger); + async move { + let iteration = COUNTER4.fetch_add(1, Ordering::Relaxed); + let handles: Vec<_> = (0..10) + .flat_map(|key_id| { + let merger = Arc::clone(&merger); + (0..10).map(move |_| { + let merger = Arc::clone(&merger); + let key = format!("key_{}_{key_id}", iteration); + tokio::spawn(async move { merger.work(key, || async { "value".to_string() }).await }) + }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } + } + }); + }); + + // singleflight-async - pre-create the group + let their_group = Arc::new(singleflight_async::SingleFlight::::new()); + group.bench_function("singleflight_async", |b| { + b.to_async(&rt).iter(|| { + let group = Arc::clone(&their_group); + async move { + let iteration = COUNTER4.fetch_add(1, Ordering::Relaxed); + let handles: Vec<_> = (0..10) + .flat_map(|key_id| { + let group = Arc::clone(&group); + (0..10).map(move |_| { + let group = Arc::clone(&group); + let key = format!("key_{}_{key_id}", iteration); + tokio::spawn(async move { group.work(key, || async { "value".to_string() }).await }) + }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } + } + }); + }); + + group.finish(); +} + +// Benchmark 5: Reuse existing group (pre-created, multiple operations) +fn bench_reuse_group(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut group = c.benchmark_group("reuse_group"); + + // Use atomic counter for unique keys + static COUNTER5: AtomicU64 = AtomicU64::new(0); + + // Our implementation - pre-create the merger + let our_merger = Arc::new(uniflight::Merger::::new()); + group.bench_function("uniflight", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&our_merger); + async move { + // Each iteration uses a unique key to avoid caching effects + let key = format!("key_{}", COUNTER5.fetch_add(1, Ordering::Relaxed)); + merger.work(key, || async { "value".to_string() }).await + } + }); + }); + + // singleflight-async - pre-create the group + let their_group = Arc::new(singleflight_async::SingleFlight::::new()); + group.bench_function("singleflight_async", |b| { + b.to_async(&rt).iter(|| { + let group = Arc::clone(&their_group); + async move { + let key = format!("key_{}", COUNTER5.fetch_add(1, Ordering::Relaxed)); + group.work(key, || async { "value".to_string() }).await + } + }); + }); + + group.finish(); +} + +criterion_group!( + benches, + bench_single_call, + bench_concurrent_10, + bench_concurrent_100, + bench_multiple_keys, + bench_reuse_group, +); + +criterion_main!(benches); diff --git a/crates/uniflight/examples/cache_population.rs b/crates/uniflight/examples/cache_population.rs index a8dfa4d0..7050b9ef 100644 --- a/crates/uniflight/examples/cache_population.rs +++ b/crates/uniflight/examples/cache_population.rs @@ -16,12 +16,12 @@ use std::{ }; use tick::Clock; -use uniflight::UniFlight; +use uniflight::Merger; #[tokio::main] async fn main() { // Create a shared UniFlight instance for cache operations - let cache_group = Arc::new(UniFlight::::new()); + let cache_group = Arc::new(Merger::::new()); // Track how many times the work closure actually executes let execution_count = Arc::new(AtomicUsize::new(0)); diff --git a/crates/uniflight/src/lib.rs b/crates/uniflight/src/lib.rs index 58abbfbb..b590a26f 100644 --- a/crates/uniflight/src/lib.rs +++ b/crates/uniflight/src/lib.rs @@ -7,14 +7,14 @@ //! Coalesces duplicate async tasks into a single execution. //! -//! This crate provides [`UniFlight`], a mechanism for deduplicating concurrent async operations. +//! This crate provides [`Merger`], a mechanism for deduplicating concurrent async operations. //! When multiple tasks request the same work (identified by a key), only the first task (the //! "leader") performs the actual work while subsequent tasks (the "followers") wait and receive //! a clone of the result. //! //! # When to Use //! -//! Use `UniFlight` when you have expensive or rate-limited operations that may be requested +//! Use `Merger` when you have expensive or rate-limited operations that may be requested //! concurrently with the same parameters: //! //! - **Cache population**: Prevent thundering herd when a cache entry expires @@ -25,10 +25,10 @@ //! # Example //! //! ``` -//! use uniflight::UniFlight; +//! use uniflight::Merger; //! //! # async fn example() { -//! let group: UniFlight<&str, String> = UniFlight::new(); +//! let group: Merger<&str, String> = Merger::new(); //! //! // Multiple concurrent calls with the same key will share a single execution //! let result = group.work("user:123", || async { @@ -40,7 +40,7 @@ //! //! # Cancellation and Panic Safety //! -//! `UniFlight` handles task cancellation and panics gracefully: +//! `Merger` handles task cancellation and panics gracefully: //! //! - If the leader task is cancelled or dropped, a follower becomes the new leader //! - If the leader task panics, a follower becomes the new leader and executes its work @@ -48,379 +48,120 @@ //! //! # Thread Safety //! -//! [`UniFlight`] is `Send` and `Sync`, and can be shared across threads. The returned futures -//! do not require `Send` bounds on the closure or its output. +//! [`Merger`] is `Send` and `Sync`, and can be shared across threads. The returned futures +//! are `Send` when the closure, future, key, and value types are `Send`. //! -//! # Multiple Leaders for Redundancy +//! # Performance //! -//! By default, `UniFlight` uses a single leader per key. For redundancy scenarios where you want -//! multiple concurrent attempts at the same operation (using whichever completes first), use -//! [`UniFlight::with_max_leaders`]: +//! Benchmarks comparing `uniflight` against `singleflight-async` show the following characteristics: //! -//! ``` -//! use uniflight::UniFlight; -//! -//! # async fn example() { -//! // Allow up to 3 concurrent leaders for redundancy -//! let group: UniFlight<&str, String> = UniFlight::with_max_leaders(3); -//! -//! // First 3 concurrent calls become leaders and execute in parallel. -//! // The first leader to complete stores the result. -//! // All callers (leaders and followers) receive that result. -//! let result = group.work("key", || async { -//! "result".to_string() -//! }).await; -//! # } -//! ``` +//! - **Concurrent workloads** (10+ tasks): uniflight is 1.2-1.3x faster, demonstrating better scalability under contention +//! - **Single calls**: singleflight-async has lower per-call overhead (~2x faster for individual operations) +//! - **Multiple keys**: uniflight performs 1.3x faster when handling multiple distinct keys concurrently //! -//! This is useful when: -//! - You want fault tolerance through redundant execution -//! - Network latency varies and you want the fastest response -//! - You're implementing speculative execution patterns +//! uniflight's DashMap-based architecture provides excellent scaling properties for high-concurrency scenarios, +//! making it well-suited for production workloads with concurrent access patterns. For low-contention scenarios +//! with predominantly single calls, the performance difference is minimal (sub-microsecond range). #![doc(html_logo_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/uniflight/logo.png")] #![doc(html_favicon_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/uniflight/favicon.ico")] use std::{ - collections::HashMap, + fmt::Debug, hash::Hash, - sync::{ - Arc, OnceLock, Weak, - atomic::{AtomicUsize, Ordering}, - }, + sync::{Arc, Weak}, }; -use event_listener::Event; -use parking_lot::Mutex as SyncMutex; - -type SharedMapping = Arc>>>; +use async_once_cell::OnceCell; +use dashmap::{DashMap, Entry::{Occupied, Vacant}}; /// Represents a class of work and creates a space in which units of work /// can be executed with duplicate suppression. -#[derive(Debug)] -pub struct UniFlight { - mapping: SharedMapping, - max_leaders: usize, -} - -impl Default for UniFlight { - fn default() -> Self { - Self { - mapping: Arc::default(), - max_leaders: 1, - } - } -} - -struct Shared { - /// Result storage - written once by the winning leader, then lock-free reads. - result: OnceLock, - /// Event for notifying waiters when result is ready or all leaders failed. - ready: Event, - /// Number of leaders currently executing. - leader_count: AtomicUsize, - /// Maximum concurrent leaders. - max_leaders: usize, -} - -impl Shared { - fn new(max_leaders: usize) -> Self { - Self { - result: OnceLock::new(), - ready: Event::new(), - leader_count: AtomicUsize::new(0), - max_leaders, - } - } -} - -/// RAII guard that decrements leader count on drop. -struct LeaderGuard { - shared: Option>>, -} - -impl LeaderGuard { - /// Try to claim a leader slot. Returns `Some(guard)` if successful, `None` if max leaders reached. - fn try_claim(shared: &Arc>) -> Option { - let current = shared.leader_count.load(Ordering::Acquire); - if current < shared.max_leaders { - let prev = shared.leader_count.fetch_add(1, Ordering::AcqRel); - if prev < shared.max_leaders { - return Some(Self { - shared: Some(Arc::clone(shared)), - }); - } - // Race lost - another caller claimed the last slot - shared.leader_count.fetch_sub(1, Ordering::AcqRel); - } - None - } - - /// Consume the guard without decrementing (called when leader successfully stores result). - fn disarm(mut self) -> Arc> { - self.shared.take().expect("LeaderGuard shared already taken") - } -} - -impl Drop for LeaderGuard { - fn drop(&mut self) { - if let Some(shared) = &self.shared { - let prev = shared.leader_count.fetch_sub(1, Ordering::AcqRel); - // If we were the last leader and no result was stored, wake one follower for promotion. - if prev == 1 && shared.result.get().is_none() { - shared.ready.notify(1); - } - } - } -} - -/// `BroadcastOnce` consists of shared slot and notify. -#[derive(Clone)] -struct BroadcastOnce { - shared: Weak>, -} - -impl BroadcastOnce { - fn new(max_leaders: usize) -> (Self, Arc>) { - let shared = Arc::new(Shared::new(max_leaders)); - ( - Self { - shared: Arc::downgrade(&shared), - }, - shared, - ) - } -} - -/// Role of a caller in the work execution. -enum Role { - /// Leader executes the work closure. - Leader { func: F, guard: LeaderGuard }, - /// Follower waits for any leader's result. Keeps func for potential promotion. - Follower { func: F }, -} - -// After calling BroadcastOnce::waiter we can get a waiter. -// It's in WaitList. -struct BroadcastOnceWaiter { - role: Role, - shared: Arc>, - - key: K, - mapping: SharedMapping, +pub struct Merger { + mapping: DashMap>>, } -impl std::fmt::Debug for BroadcastOnce { +impl Debug for Merger { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "BroadcastOnce") + f.debug_struct("Merger") + .field("mapping", &format_args!("DashMap<...>")) + .finish() } } -#[expect( - clippy::type_complexity, - reason = "The Result type is complex but intentionally groups related items for the retry pattern" -)] -impl BroadcastOnce { - /// Attempts to create a waiter for an existing broadcast. - /// - /// Returns `Ok` with a waiter (either leader or follower role) if the broadcast is still active. - /// Returns `Err` if all leaders have dropped (weak reference upgrade failed). - fn try_waiter( - &self, - func: F, - key: K, - mapping: SharedMapping, - ) -> Result, (F, K, SharedMapping)> { - let Some(shared) = self.shared.upgrade() else { - return Err((func, key, mapping)); - }; - - // Try to become a leader if slots are available - if let Some(guard) = LeaderGuard::try_claim(&shared) { - return Ok(BroadcastOnceWaiter { - role: Role::Leader { func, guard }, - shared, - key, - mapping, - }); - } - - // Become a follower (keep func for potential promotion) - Ok(BroadcastOnceWaiter { - role: Role::Follower { func }, - shared, - key, - mapping, - }) - } - - /// Creates a waiter for a new broadcast entry (first caller always becomes leader). - fn leader_waiter(shared: Arc>, func: F, key: K, mapping: SharedMapping) -> BroadcastOnceWaiter { - // Safe to unwrap: new Shared starts at 0, max_leaders >= 1 - let guard = LeaderGuard::try_claim(&shared).expect("first leader claim should always succeed"); - BroadcastOnceWaiter { - role: Role::Leader { func, guard }, - shared, - key, - mapping, - } - } -} - -// We already in WaitList, so wait will be fine, we won't miss -// anything after Waiter generated. -impl BroadcastOnceWaiter +impl Default for Merger where K: Hash + Eq, - F: FnOnce() -> Fut, - Fut: Future, - T: Clone, { - async fn wait(self) -> T { - let Self { - role, - shared, - key, - mapping, - } = self; - match role { - Role::Leader { func, guard } => Self::wait_as_leader(shared, key, mapping, func, guard).await, - Role::Follower { func } => Self::wait_as_follower(shared, key, mapping, func).await, - } - } - - async fn wait_as_leader(shared: Arc>, key: K, mapping: SharedMapping, func: F, guard: LeaderGuard) -> T { - // Check if another leader already stored a result (lock-free read). - if let Some(result) = shared.result.get() { - guard.disarm(); - return result.clone(); - } - - // Execute the work. - let value = func().await; - - // Try to store the result. First writer wins via OnceLock. - if shared.result.set(value.clone()).is_ok() { - // We stored the result - clean up the mapping entry. - mapping.lock().remove(&key); - } - - // Notify ALL waiting followers simultaneously. - shared.ready.notify(usize::MAX); - - // Disarm the guard (result is stored, count doesn't matter). - guard.disarm(); - - // Return our computed value, or the winning value if we lost the race. - shared.result.get().cloned().unwrap_or(value) - } - - async fn wait_as_follower(shared: Arc>, key: K, mapping: SharedMapping, func: F) -> T { - loop { - // Fast path: result already available (lock-free read). - if let Some(result) = shared.result.get() { - return result.clone(); - } - - // Register listener BEFORE checking state to avoid missed notifications. - let listener = shared.ready.listen(); - - // Double-check after registering. - if let Some(result) = shared.result.get() { - return result.clone(); - } - - // Check if all leaders have failed and we need promotion. - if shared.leader_count.load(Ordering::Acquire) == 0 { - // All leaders failed - promote ourselves. - let guard = LeaderGuard::try_claim(&shared).expect("follower promotion should always succeed"); - return Self::wait_as_leader(shared, key, mapping, func, guard).await; - } - - // Wait for notification (in parallel with other followers). - listener.await; + fn default() -> Self { + Self { + mapping: DashMap::new(), } } } -impl UniFlight +impl Merger where - K: Hash + Eq + Clone, + K: Hash + Eq + Clone + Send + Sync, + T: Send + Sync, { - /// Creates a new `UniFlight` instance with single-leader behavior. + /// Creates a new `Merger` instance. #[inline] #[must_use] pub fn new() -> Self { Self::default() } - /// Creates a new `UniFlight` instance allowing up to `max_leaders` concurrent executions. - /// - /// When multiple tasks request the same work concurrently, up to `max_leaders` of them - /// will execute in parallel. The first to complete wins, and all other tasks (both - /// executing leaders and waiting followers) receive that result. - /// - /// This is useful for redundancy scenarios where you want multiple attempts at the - /// same operation and want to use whichever completes first. - /// - /// # Panics - /// - /// Panics if `max_leaders` is 0. - /// - /// # Example - /// - /// ``` - /// use uniflight::UniFlight; - /// - /// # async fn example() { - /// // Allow 3 concurrent leaders for redundancy - /// let group: UniFlight<&str, String> = UniFlight::with_max_leaders(3); - /// - /// // Up to 3 concurrent calls will execute in parallel - /// let result = group.work("key", || async { - /// "result".to_string() - /// }).await; - /// # } - /// ``` - #[inline] - #[must_use] - pub fn with_max_leaders(max_leaders: usize) -> Self { - assert!(max_leaders > 0, "max_leaders must be at least 1"); - Self { - mapping: Arc::default(), - max_leaders, - } - } - - /// Execute and return the value for a given function, making sure that only up to - /// `max_leaders` operations are in-flight at a given moment. If a duplicate call comes in - /// beyond the limit, that caller will wait until one of the leaders completes and return - /// the same value. - pub fn work(&self, key: K, func: F) -> impl Future + /// Execute and return the value for a given function, making sure that only one + /// operation is in-flight at a given moment. If a duplicate call comes in, + /// that caller will wait until the leader completes and return the same value. + pub fn work(&self, key: K, func: F) -> impl Future + Send where - F: FnOnce() -> Fut, - Fut: Future, + F: FnOnce() -> Fut + Send, + Fut: Future + Send, T: Clone, { - let owned_mapping = Arc::clone(&self.mapping); - let mut mapping = self.mapping.lock(); - let val = mapping.get_mut(&key); - if let Some(call) = val { - let (func, key, owned_mapping) = match call.try_waiter(func, key, owned_mapping) { - Ok(waiter) => return waiter.wait(), - Err(fm) => fm, - }; - // All leaders dropped - create new broadcast entry - let (new_call, shared) = BroadcastOnce::new(self.max_leaders); - *call = new_call; - let waiter = BroadcastOnce::leader_waiter(shared, func, key, owned_mapping); - waiter.wait() - } else { - // New key - create broadcast entry and become first leader - let (call, shared) = BroadcastOnce::new(self.max_leaders); - mapping.insert(key.clone(), call); - let waiter = BroadcastOnce::leader_waiter(shared, func, key, owned_mapping); - waiter.wait() + let cell = self.get_or_create_cell(&key); + let mapping = &self.mapping; + async move { + let result = cell.get_or_init(func()).await.clone(); + // Clean up expired weak reference if present + // Use remove_if to atomically check and remove + mapping.remove_if(&key, |_, weak| weak.upgrade().is_none()); + result + } + } + + /// Gets an existing `OnceCell` for the key, or creates a new one. + fn get_or_create_cell(&self, key: &K) -> Arc> { + // Fast path: check if entry exists and is still valid + if let Some(entry) = self.mapping.get(key) + && let Some(cell) = entry.value().upgrade() + { + return cell; } + + // Slow path: need to insert or replace expired entry + let cell = Arc::new(OnceCell::new()); + let weak = Arc::downgrade(&cell); + + // Use Entry enum to atomically check-and-return or insert + match self.mapping.entry(key.clone()) { + Occupied(mut entry) => { + // Entry exists - check if still alive + if let Some(existing) = entry.get().upgrade() { + // Another thread's cell is still alive - use it + return existing; + } + // Expired - replace with ours + entry.insert(weak); + } + Vacant(entry) => { + entry.insert(weak); + } + } + + // We inserted our cell, return it + cell } } diff --git a/crates/uniflight/tests/work.rs b/crates/uniflight/tests/work.rs index a8e5956f..6033011c 100644 --- a/crates/uniflight/tests/work.rs +++ b/crates/uniflight/tests/work.rs @@ -15,7 +15,7 @@ use std::{ }; use futures_util::{StreamExt, stream::FuturesUnordered}; -use uniflight::UniFlight; +use uniflight::Merger; fn unreachable_future() -> std::future::Pending { std::future::pending() @@ -23,7 +23,7 @@ fn unreachable_future() -> std::future::Pending { #[tokio::test] async fn direct_call() { - let group = UniFlight::new(); + let group = Merger::new(); let result = group .work("key", || async { tokio::time::sleep(Duration::from_millis(10)).await; @@ -37,7 +37,7 @@ async fn direct_call() { async fn parallel_call() { let call_counter = AtomicUsize::default(); - let group = UniFlight::new(); + let group = Merger::new(); let futures = FuturesUnordered::new(); for _ in 0..10 { futures.push(group.work("key", || async { @@ -55,7 +55,7 @@ async fn parallel_call() { async fn parallel_call_seq_await() { let call_counter = AtomicUsize::default(); - let group = UniFlight::new(); + let group = Merger::new(); let mut futures = Vec::new(); for _ in 0..10 { futures.push(group.work("key", || async { @@ -73,7 +73,7 @@ async fn parallel_call_seq_await() { #[tokio::test] async fn call_with_static_str_key() { - let group = UniFlight::new(); + let group = Merger::new(); let result = group .work("key".to_string(), || async { tokio::time::sleep(Duration::from_millis(1)).await; @@ -85,7 +85,7 @@ async fn call_with_static_str_key() { #[tokio::test] async fn call_with_static_string_key() { - let group = UniFlight::new(); + let group = Merger::new(); let result = group .work("key".to_string(), || async { tokio::time::sleep(Duration::from_millis(1)).await; @@ -99,7 +99,7 @@ async fn call_with_static_string_key() { async fn call_with_custom_key() { #[derive(Clone, PartialEq, Eq, Hash)] struct K(i32); - let group = UniFlight::new(); + let group = Merger::new(); let result = group .work(K(1), || async { tokio::time::sleep(Duration::from_millis(1)).await; @@ -111,7 +111,7 @@ async fn call_with_custom_key() { #[tokio::test] async fn late_wait() { - let group = UniFlight::new(); + let group = Merger::new(); let fut_early = group.work("key".to_string(), || async { tokio::time::sleep(Duration::from_millis(20)).await; "Result".to_string() @@ -124,7 +124,7 @@ async fn late_wait() { #[tokio::test] async fn cancel() { - let group = UniFlight::new(); + let group = Merger::new(); // the executer cancelled and the other awaiter will create a new future and execute. let fut_cancel = group.work("key".to_string(), unreachable_future); @@ -148,7 +148,7 @@ async fn cancel() { #[tokio::test] async fn leader_panic_in_spawned_task() { let call_counter = AtomicUsize::default(); - let group: Arc> = Arc::new(UniFlight::new()); + let group: Arc> = Arc::new(Merger::new()); // First task will panic in a spawned task (no catch_unwind) let group_clone = Arc::clone(&group); @@ -186,11 +186,11 @@ async fn leader_panic_in_spawned_task() { #[tokio::test] async fn debug_impl() { - let group: UniFlight = UniFlight::new(); + let group: Merger = Merger::new(); // Test Debug on empty group let debug_str = format!("{:?}", group); - assert!(debug_str.contains("UniFlight")); + assert!(debug_str.contains("Merger")); // Create a pending work item to populate the mapping with a BroadcastOnce let fut = group.work("key".to_string(), || async { @@ -200,97 +200,10 @@ async fn debug_impl() { // Debug should still work with entries in the mapping let debug_str = format!("{:?}", group); - assert!(debug_str.contains("UniFlight")); - assert!(debug_str.contains("BroadcastOnce")); + assert!(debug_str.contains("Merger")); + // The mapping is a DashMap + assert!(debug_str.contains("DashMap")); // Complete the work assert_eq!(fut.await, "Result"); } - -// N-leader tests - -#[tokio::test] -async fn with_max_leaders_basic() { - let group: UniFlight<&str, String> = UniFlight::with_max_leaders(3); - let result = group - .work("key", || async { - tokio::time::sleep(Duration::from_millis(10)).await; - "Result".to_string() - }) - .await; - assert_eq!(result, "Result"); -} - -#[tokio::test] -async fn multiple_leaders_all_get_same_result() { - let call_counter = AtomicUsize::default(); - - // Allow up to 3 concurrent leaders - let group = UniFlight::with_max_leaders(3); - let futures = FuturesUnordered::new(); - - // Start 5 concurrent calls - up to 3 become leaders, 2 become followers - for i in 0..5 { - let counter = &call_counter; - futures.push(group.work("key", move || async move { - tokio::time::sleep(Duration::from_millis(50)).await; - counter.fetch_add(1, AcqRel); - format!("Result-{i}") - })); - } - - // All should complete with the same result (first to finish wins) - let results: Vec<_> = futures.collect().await; - let first_result = &results[0]; - assert!(results.iter().all(|r| r == first_result)); -} - -#[tokio::test] -async fn followers_get_first_leader_result() { - let group = UniFlight::with_max_leaders(2); - - // Start first leader (slow) - let fut1 = group.work("key".to_string(), || async { - tokio::time::sleep(Duration::from_millis(100)).await; - "slow".to_string() - }); - - // Start second leader (fast) - let fut2 = group.work("key".to_string(), || async { - tokio::time::sleep(Duration::from_millis(10)).await; - "fast".to_string() - }); - - // Start followers (should get whichever leader finishes first) - let fut3 = group.work("key".to_string(), unreachable_future); - let fut4 = group.work("key".to_string(), unreachable_future); - - // Note: Due to current implementation, leaders serialize on slot lock, - // so execution order is deterministic. The first to acquire the lock wins. - let (r1, r2, r3, r4) = tokio::join!(fut1, fut2, fut3, fut4); - - // All should have the same result - assert_eq!(r1, r2); - assert_eq!(r2, r3); - assert_eq!(r3, r4); -} - -#[tokio::test] -async fn leader_cancel_with_multiple_leaders() { - let group: Arc> = Arc::new(UniFlight::with_max_leaders(2)); - - // First leader will be cancelled - let group_clone = Arc::clone(&group); - let fut_cancel = group_clone.work("key".to_string(), unreachable_future); - let _ = tokio::time::timeout(Duration::from_millis(10), fut_cancel).await; - - // Second leader should succeed - let result = group.work("key".to_string(), || async { "Success".to_string() }).await; - assert_eq!(result, "Success"); -} - -#[tokio::test] -#[should_panic(expected = "max_leaders must be at least 1")] -async fn with_max_leaders_zero_panics() { - let _group: UniFlight<&str, String> = UniFlight::with_max_leaders(0); -}