diff --git a/.gitignore b/.gitignore index 20b91f6b..6368ac41 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,5 @@ _manifest ARROW # Agent files -.claude \ No newline at end of file +.claude +CLAUDE.md \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 74da390b..3807276e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ Please see each crate's change log below: +- [`async_once`](./crates/async_once/CHANGELOG.md) - [`bytesbuf`](./crates/bytesbuf/CHANGELOG.md) - [`bytesbuf_io`](./crates/bytesbuf_io/CHANGELOG.md) - [`data_privacy`](./crates/data_privacy/CHANGELOG.md) @@ -16,3 +17,4 @@ Please see each crate's change log below: - [`thread_aware_macros`](./crates/thread_aware_macros/CHANGELOG.md) - [`thread_aware_macros_impl`](./crates/thread_aware_macros_impl/CHANGELOG.md) - [`tick`](./crates/tick/CHANGELOG.md) +- [`uniflight`](./crates/uniflight/CHANGELOG.md) diff --git a/Cargo.lock b/Cargo.lock index b4bb9653..22039bbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,6 +44,12 @@ version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +[[package]] +name = "async-once-cell" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288f83726785267c6f2ef073a3d83dc3f9b81464e9f99898240cced85fce35a" + [[package]] name = "autocfg" version = "1.5.0" @@ -256,6 +262,7 @@ dependencies = [ "serde", "serde_json", "tinytemplate", + "tokio", "walkdir", ] @@ -269,12 +276,32 @@ dependencies = [ "itertools 0.13.0", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crunchy" version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data_privacy" version = "0.10.0" @@ -574,6 +601,12 @@ dependencies = [ "byteorder", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.16.1" @@ -621,7 +654,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.16.1", ] [[package]] @@ -1383,6 +1416,16 @@ version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" +[[package]] +name = "singleflight-async" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffae0d841b8012a86beec66a3f9c57b7b331a10366c764cd40bd6faebe3ad77c" +dependencies = [ + "parking_lot", + "tokio", +] + [[package]] name = "siphasher" version = "1.0.1" @@ -1639,6 +1682,7 @@ version = "1.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" dependencies = [ + "parking_lot", "pin-project-lite", "tokio-macros", ] @@ -1773,6 +1817,19 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "uniflight" +version = "0.1.0" +dependencies = [ + "async-once-cell", + "criterion", + "dashmap", + "futures-util", + "singleflight-async", + "tick", + "tokio", +] + [[package]] name = "walkdir" version = "2.5.0" diff --git a/Cargo.toml b/Cargo.toml index b40f8f95..f36529b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,16 +39,20 @@ thread_aware = { path = "crates/thread_aware", default-features = false, version thread_aware_macros = { path = "crates/thread_aware_macros", default-features = false, version = "0.6.0" } thread_aware_macros_impl = { path = "crates/thread_aware_macros_impl", default-features = false, version = "0.6.0" } tick = { path = "crates/tick", default-features = false, version = "0.1.2" } +uniflight = { path = "crates/uniflight", default-features = false, version = "0.1.0" } # external dependencies alloc_tracker = { version = "0.5.9", default-features = false } +async-once-cell = { version = "0.5", default-features = false } anyhow = { version = "1.0.100", default-features = false } bytes = { version = "1.11.0", default-features = false } chrono = { version = "0.4.40", default-features = false } chrono-tz = { version = "0.10.4", default-features = false } criterion = { version = "0.7.0", default-features = false } +dashmap = { version = "6.1", default-features = false } derive_more = { version = "2.0.1", default-features = false } duct = { version = "1.1.1", default-features = false } +event-listener = { version = "5.4.0", default-features = false } futures = { version = "0.3.31", default-features = false } futures-core = { version = "0.3.31", default-features = false } futures-util = { version = "0.3.31", default-features = false } @@ -63,6 +67,7 @@ new_zealand = { version = "1.0.1", default-features = false } nm = { version = "0.1.21", default-features = false } num-traits = { version = "0.2.19", default-features = false } once_cell = { version = "1.21.3", default-features = false } +parking_lot = { version = "0.12.5", default-features = false } pin-project-lite = { version = "0.2.13", default-features = false } pretty_assertions = { version = "1.4.1", default-features = false } prettyplease = { version = "0.2.37", default-features = false } @@ -75,6 +80,7 @@ rustc-hash = { version = "2.1.0", default-features = false } serde = { version = "1.0.228", default-features = false } serde_core = { version = "1.0.228", default-features = false } serde_json = { version = "1.0.145", default-features = false } +singleflight-async = { version = "0.2", default-features = false } smallvec = { version = "1.15.1", default-features = false } static_assertions = { version = "1.1.0", default-features = false } syn = { version = "2.0.111", default-features = false } @@ -88,6 +94,7 @@ trait-variant = { version = "0.1.2", default-features = false } trybuild = { version = "1.0.114", default-features = false } typeid = { version = "1.0.3", default-features = false } windows-sys = { version = "0.61.2", default-features = false } +xutex = { version = "0.2.0", default-features = false } xxhash-rust = { version = "0.8.15", default-features = false } [workspace.lints.rust] diff --git a/README.md b/README.md index 313a69ed..2a5c20aa 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,15 @@ This repository contains a set of crates that help you build robust highly scalable services in Rust. -- [Crates](#crates) -- [About this Repo](#about-this-repo) +- [The Oxidizer Project](#the-oxidizer-project) + - [Crates](#crates) + - [About this Repo](#about-this-repo) - [Adding New Crates](#adding-new-crates) - [Publishing Crates](#publishing-crates) - [Documenting Crates](#documenting-crates) - [CI Workflows](#ci-workflows) - [Pull Request Gates](#pull-request-gates) -- [Trademarks](#trademarks) + - [Trademarks](#trademarks) ## Crates @@ -38,6 +39,7 @@ These are the crates built out of this repo: - [`thread_aware_macros`](./crates/thread_aware_macros/README.md) - Macros for the `thread_aware` crate. - [`thread_aware_macros_impl`](./crates/thread_aware_macros_impl/README.md) - Macros for the `thread_aware` crate. - [`tick`](./crates/tick/README.md) - Provides primitives to interact with and manipulate machine time. +- [`uniflight`](./crates/uniflight/README.md) - Coalesces duplicate async tasks into a single execution. ## About this Repo diff --git a/crates/uniflight/CHANGELOG.md b/crates/uniflight/CHANGELOG.md new file mode 100644 index 00000000..0906fd27 --- /dev/null +++ b/crates/uniflight/CHANGELOG.md @@ -0,0 +1,8 @@ +# Changelog + +## [0.1.0] - 2025-12-10 + +- 🧩 Miscellaneous + + - Initial commit of uniflight + diff --git a/crates/uniflight/Cargo.toml b/crates/uniflight/Cargo.toml new file mode 100644 index 00000000..54e44f0c --- /dev/null +++ b/crates/uniflight/Cargo.toml @@ -0,0 +1,43 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +[package] +name = "uniflight" +description = "Coalesces multiple ongoing tasks into a leader which does the work, and follower tasks that wait on the result, to prevent duplicate I/O or other downstream overhead." +version = "0.1.0" +readme = "README.md" +keywords = ["oxidizer", "coalescing", "stempede", "singleflight", "deduplication"] +categories = ["concurrency"] + +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +async-once-cell.workspace = true +dashmap.workspace = true + +[dev-dependencies] +criterion = { workspace = true, features = ["async_tokio"] } +futures-util = { workspace = true, features = ["alloc", "std"] } +singleflight-async.workspace = true +tick = { workspace = true, features = ["tokio"] } +tokio = { workspace = true, features = [ + "macros", + "rt", + "time", + "rt-multi-thread", +] } + +[lints] +workspace = true + +[[bench]] +name = "comparison" +harness = false + +[[example]] +name = "cache_population" diff --git a/crates/uniflight/README.md b/crates/uniflight/README.md new file mode 100644 index 00000000..09d79345 --- /dev/null +++ b/crates/uniflight/README.md @@ -0,0 +1,83 @@ +
+ Uniflight Logo + +# Uniflight + +[![crate.io](https://img.shields.io/crates/v/uniflight.svg)](https://crates.io/crates/uniflight) +[![docs.rs](https://docs.rs/uniflight/badge.svg)](https://docs.rs/uniflight) +[![MSRV](https://img.shields.io/crates/msrv/uniflight)](https://crates.io/crates/uniflight) +[![CI](https://github.com/microsoft/oxidizer/workflows/main/badge.svg)](https://github.com/microsoft/oxidizer/actions) +[![Coverage](https://codecov.io/gh/microsoft/oxidizer/graph/badge.svg?token=FCUG0EL5TI)](https://codecov.io/gh/microsoft/oxidizer) +[![License](https://img.shields.io/badge/license-MIT-blue.svg)](../LICENSE) + +
+ +* [Summary](#summary) + +## Summary + + + +Coalesces duplicate async tasks into a single execution. + +This crate provides [`Merger`], a mechanism for deduplicating concurrent async operations. +When multiple tasks request the same work (identified by a key), only the first task (the +"leader") performs the actual work while subsequent tasks (the "followers") wait and receive +a clone of the result. + +## When to Use + +Use `Merger` when you have expensive or rate-limited operations that may be requested +concurrently with the same parameters: + +- **Cache population**: Prevent thundering herd when a cache entry expires +- **API calls**: Deduplicate concurrent requests to the same endpoint +- **Database queries**: Coalesce identical queries issued simultaneously +- **File I/O**: Avoid reading the same file multiple times concurrently + +## Example + +```rust +use uniflight::Merger; + +let group: Merger<&str, String> = Merger::new(); + +// Multiple concurrent calls with the same key will share a single execution +let result = group.work("user:123", || async { + // This expensive operation runs only once, even if called concurrently + "expensive_result".to_string() +}).await; +``` + +## Cancellation and Panic Safety + +`Merger` handles task cancellation and panics gracefully: + +- If the leader task is cancelled or dropped, a follower becomes the new leader +- If the leader task panics, a follower becomes the new leader and executes its work +- Followers that join before the leader completes receive the cached result + +## Thread Safety + +[`Merger`] is `Send` and `Sync`, and can be shared across threads. The returned futures +are `Send` when the closure, future, key, and value types are `Send`. + +## Performance + +Benchmarks comparing `uniflight` against `singleflight-async` show the following characteristics: + +- **Concurrent workloads** (10+ tasks): uniflight is 1.2-1.3x faster, demonstrating better scalability under contention +- **Single calls**: singleflight-async has lower per-call overhead (~2x faster for individual operations) +- **Multiple keys**: uniflight performs 1.3x faster when handling multiple distinct keys concurrently + +uniflight's DashMap-based architecture provides excellent scaling properties for high-concurrency scenarios, +making it well-suited for production workloads with concurrent access patterns. For low-contention scenarios +with predominantly single calls, the performance difference is minimal (sub-microsecond range). + + + +

+ +This crate was developed as part of [The Oxidizer Project](https://github.com/microsoft/oxidizer). + +
diff --git a/crates/uniflight/benches/comparison.rs b/crates/uniflight/benches/comparison.rs new file mode 100644 index 00000000..fd53077d --- /dev/null +++ b/crates/uniflight/benches/comparison.rs @@ -0,0 +1,269 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Benchmarks comparing uniflight against singleflight-async. + +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, +}; + +use criterion::{Criterion, criterion_group, criterion_main}; + +// Benchmark 1: Single call (no contention) +fn bench_single_call(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut group = c.benchmark_group("single_call"); + + // Use atomic counter for unique keys + static COUNTER1: AtomicU64 = AtomicU64::new(0); + + // Our implementation - pre-create the merger + let our_merger = Arc::new(uniflight::Merger::::new()); + group.bench_function("uniflight", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&our_merger); + async move { + let key = format!("key_{}", COUNTER1.fetch_add(1, Ordering::Relaxed)); + merger.work(key, || async { "value".to_string() }).await + } + }); + }); + + // singleflight-async - pre-create the group + let their_group = Arc::new(singleflight_async::SingleFlight::::new()); + group.bench_function("singleflight_async", |b| { + b.to_async(&rt).iter(|| { + let group = Arc::clone(&their_group); + async move { + let key = format!("key_{}", COUNTER1.fetch_add(1, Ordering::Relaxed)); + group.work(key, || async { "value".to_string() }).await + } + }); + }); + + group.finish(); +} + +// Benchmark 2: Concurrent calls (10 tasks, same key) +fn bench_concurrent_10(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut group = c.benchmark_group("concurrent_10_tasks"); + + // Use atomic counter for unique keys per iteration + static COUNTER2: AtomicU64 = AtomicU64::new(0); + + // Our implementation - pre-create the merger + let our_merger = Arc::new(uniflight::Merger::::new()); + group.bench_function("uniflight", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&our_merger); + async move { + let key = format!("key_{}", COUNTER2.fetch_add(1, Ordering::Relaxed)); + let handles: Vec<_> = (0..10) + .map(|_| { + let merger = Arc::clone(&merger); + let key = key.clone(); + tokio::spawn(async move { merger.work(key, || async { "value".to_string() }).await }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } + } + }); + }); + + // singleflight-async - pre-create the group + let their_group = Arc::new(singleflight_async::SingleFlight::::new()); + group.bench_function("singleflight_async", |b| { + b.to_async(&rt).iter(|| { + let group = Arc::clone(&their_group); + async move { + let key = format!("key_{}", COUNTER2.fetch_add(1, Ordering::Relaxed)); + let handles: Vec<_> = (0..10) + .map(|_| { + let group = Arc::clone(&group); + let key = key.clone(); + tokio::spawn(async move { group.work(key, || async { "value".to_string() }).await }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } + } + }); + }); + + group.finish(); +} + +// Benchmark 3: High contention (100 tasks, same key) +fn bench_concurrent_100(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut group = c.benchmark_group("concurrent_100_tasks"); + + // Use atomic counter for unique keys per iteration + static COUNTER3: AtomicU64 = AtomicU64::new(0); + + // Our implementation - pre-create the merger + let our_merger = Arc::new(uniflight::Merger::::new()); + group.bench_function("uniflight", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&our_merger); + async move { + let key = format!("key_{}", COUNTER3.fetch_add(1, Ordering::Relaxed)); + let handles: Vec<_> = (0..100) + .map(|_| { + let merger = Arc::clone(&merger); + let key = key.clone(); + tokio::spawn(async move { merger.work(key, || async { "value".to_string() }).await }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } + } + }); + }); + + // singleflight-async - pre-create the group + let their_group = Arc::new(singleflight_async::SingleFlight::::new()); + group.bench_function("singleflight_async", |b| { + b.to_async(&rt).iter(|| { + let group = Arc::clone(&their_group); + async move { + let key = format!("key_{}", COUNTER3.fetch_add(1, Ordering::Relaxed)); + let handles: Vec<_> = (0..100) + .map(|_| { + let group = Arc::clone(&group); + let key = key.clone(); + tokio::spawn(async move { group.work(key, || async { "value".to_string() }).await }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } + } + }); + }); + + group.finish(); +} + +// Benchmark 4: Multiple different keys (10 keys, 10 tasks each) +fn bench_multiple_keys(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut group = c.benchmark_group("multiple_keys_10x10"); + + // Use atomic counter for unique key prefix per iteration + static COUNTER4: AtomicU64 = AtomicU64::new(0); + + // Our implementation - pre-create the merger + let our_merger = Arc::new(uniflight::Merger::::new()); + group.bench_function("uniflight", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&our_merger); + async move { + let iteration = COUNTER4.fetch_add(1, Ordering::Relaxed); + let handles: Vec<_> = (0..10) + .flat_map(|key_id| { + let merger = Arc::clone(&merger); + (0..10).map(move |_| { + let merger = Arc::clone(&merger); + let key = format!("key_{}_{key_id}", iteration); + tokio::spawn(async move { merger.work(key, || async { "value".to_string() }).await }) + }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } + } + }); + }); + + // singleflight-async - pre-create the group + let their_group = Arc::new(singleflight_async::SingleFlight::::new()); + group.bench_function("singleflight_async", |b| { + b.to_async(&rt).iter(|| { + let group = Arc::clone(&their_group); + async move { + let iteration = COUNTER4.fetch_add(1, Ordering::Relaxed); + let handles: Vec<_> = (0..10) + .flat_map(|key_id| { + let group = Arc::clone(&group); + (0..10).map(move |_| { + let group = Arc::clone(&group); + let key = format!("key_{}_{key_id}", iteration); + tokio::spawn(async move { group.work(key, || async { "value".to_string() }).await }) + }) + }) + .collect(); + + for handle in handles { + handle.await.unwrap(); + } + } + }); + }); + + group.finish(); +} + +// Benchmark 5: Reuse existing group (pre-created, multiple operations) +fn bench_reuse_group(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut group = c.benchmark_group("reuse_group"); + + // Use atomic counter for unique keys + static COUNTER5: AtomicU64 = AtomicU64::new(0); + + // Our implementation - pre-create the merger + let our_merger = Arc::new(uniflight::Merger::::new()); + group.bench_function("uniflight", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&our_merger); + async move { + // Each iteration uses a unique key to avoid caching effects + let key = format!("key_{}", COUNTER5.fetch_add(1, Ordering::Relaxed)); + merger.work(key, || async { "value".to_string() }).await + } + }); + }); + + // singleflight-async - pre-create the group + let their_group = Arc::new(singleflight_async::SingleFlight::::new()); + group.bench_function("singleflight_async", |b| { + b.to_async(&rt).iter(|| { + let group = Arc::clone(&their_group); + async move { + let key = format!("key_{}", COUNTER5.fetch_add(1, Ordering::Relaxed)); + group.work(key, || async { "value".to_string() }).await + } + }); + }); + + group.finish(); +} + +criterion_group!( + benches, + bench_single_call, + bench_concurrent_10, + bench_concurrent_100, + bench_multiple_keys, + bench_reuse_group, +); + +criterion_main!(benches); diff --git a/crates/uniflight/examples/cache_population.rs b/crates/uniflight/examples/cache_population.rs new file mode 100644 index 00000000..7050b9ef --- /dev/null +++ b/crates/uniflight/examples/cache_population.rs @@ -0,0 +1,69 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Demonstrates using `UniFlight` to prevent thundering herd when populating a cache. +//! +//! Multiple concurrent requests for the same cache key will share a single execution, +//! with the first request (leader) performing the work and subsequent requests (followers) +//! receiving a copy of the result. + +use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, +}; + +use tick::Clock; +use uniflight::Merger; + +#[tokio::main] +async fn main() { + // Create a shared UniFlight instance for cache operations + let cache_group = Arc::new(Merger::::new()); + + // Track how many times the work closure actually executes + let execution_count = Arc::new(AtomicUsize::new(0)); + + println!("Starting 5 concurrent requests for user:123...\n"); + + // Simulate 5 concurrent requests for the same user data + let mut handles = Vec::new(); + for i in 1..=5 { + let group = Arc::clone(&cache_group); + let counter = Arc::clone(&execution_count); + let handle = tokio::spawn(async move { + let clock = Clock::new_tokio(); + let start = clock.instant(); + + let result = group + .work("user:123".to_string(), || async { + let count = counter.fetch_add(1, Ordering::SeqCst) + 1; + println!(" [Request {i}] I'm the leader! Fetching from database... (execution #{count})"); + + // Simulate expensive database query + clock.delay(Duration::from_millis(500)).await; + + "UserData(name: Alice, age: 30)".to_string() + }) + .await; + + let elapsed = start.elapsed(); + println!(" [Request {i}] Got result in {elapsed:?}: {result}"); + }); + + handles.push(handle); + + // Stagger the requests slightly to see the deduplication in action + tokio::time::sleep(Duration::from_millis(10)).await; + } + + // Wait for all requests to complete + for handle in handles { + handle.await.expect("Task panicked"); + } + + let total_executions = execution_count.load(Ordering::SeqCst); + println!("\nAll requests completed! Database query executed {total_executions} time(s) for 5 requests."); +} diff --git a/crates/uniflight/favicon.ico b/crates/uniflight/favicon.ico new file mode 100644 index 00000000..2ed275b8 --- /dev/null +++ b/crates/uniflight/favicon.ico @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:bd1ebeca79834229253d38008b7d0491947042e808664646f907472acbb9ba01 +size 15406 diff --git a/crates/uniflight/logo.png b/crates/uniflight/logo.png new file mode 100644 index 00000000..90a48880 --- /dev/null +++ b/crates/uniflight/logo.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ec6fa0a1a07f8c21fb9946bc8ea78f127f027554fd8ca67f0b296d8c3d234b68 +size 46554 diff --git a/crates/uniflight/src/lib.rs b/crates/uniflight/src/lib.rs new file mode 100644 index 00000000..b590a26f --- /dev/null +++ b/crates/uniflight/src/lib.rs @@ -0,0 +1,167 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +// Based on singleflight-async by ihciah +// Original: https://github.com/ihciah/singleflight-async +// Licensed under MIT/Apache-2.0 + +//! Coalesces duplicate async tasks into a single execution. +//! +//! This crate provides [`Merger`], a mechanism for deduplicating concurrent async operations. +//! When multiple tasks request the same work (identified by a key), only the first task (the +//! "leader") performs the actual work while subsequent tasks (the "followers") wait and receive +//! a clone of the result. +//! +//! # When to Use +//! +//! Use `Merger` when you have expensive or rate-limited operations that may be requested +//! concurrently with the same parameters: +//! +//! - **Cache population**: Prevent thundering herd when a cache entry expires +//! - **API calls**: Deduplicate concurrent requests to the same endpoint +//! - **Database queries**: Coalesce identical queries issued simultaneously +//! - **File I/O**: Avoid reading the same file multiple times concurrently +//! +//! # Example +//! +//! ``` +//! use uniflight::Merger; +//! +//! # async fn example() { +//! let group: Merger<&str, String> = Merger::new(); +//! +//! // Multiple concurrent calls with the same key will share a single execution +//! let result = group.work("user:123", || async { +//! // This expensive operation runs only once, even if called concurrently +//! "expensive_result".to_string() +//! }).await; +//! # } +//! ``` +//! +//! # Cancellation and Panic Safety +//! +//! `Merger` handles task cancellation and panics gracefully: +//! +//! - If the leader task is cancelled or dropped, a follower becomes the new leader +//! - If the leader task panics, a follower becomes the new leader and executes its work +//! - Followers that join before the leader completes receive the cached result +//! +//! # Thread Safety +//! +//! [`Merger`] is `Send` and `Sync`, and can be shared across threads. The returned futures +//! are `Send` when the closure, future, key, and value types are `Send`. +//! +//! # Performance +//! +//! Benchmarks comparing `uniflight` against `singleflight-async` show the following characteristics: +//! +//! - **Concurrent workloads** (10+ tasks): uniflight is 1.2-1.3x faster, demonstrating better scalability under contention +//! - **Single calls**: singleflight-async has lower per-call overhead (~2x faster for individual operations) +//! - **Multiple keys**: uniflight performs 1.3x faster when handling multiple distinct keys concurrently +//! +//! uniflight's DashMap-based architecture provides excellent scaling properties for high-concurrency scenarios, +//! making it well-suited for production workloads with concurrent access patterns. For low-contention scenarios +//! with predominantly single calls, the performance difference is minimal (sub-microsecond range). + +#![doc(html_logo_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/uniflight/logo.png")] +#![doc(html_favicon_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/uniflight/favicon.ico")] + +use std::{ + fmt::Debug, + hash::Hash, + sync::{Arc, Weak}, +}; + +use async_once_cell::OnceCell; +use dashmap::{DashMap, Entry::{Occupied, Vacant}}; + +/// Represents a class of work and creates a space in which units of work +/// can be executed with duplicate suppression. +pub struct Merger { + mapping: DashMap>>, +} + +impl Debug for Merger { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Merger") + .field("mapping", &format_args!("DashMap<...>")) + .finish() + } +} + +impl Default for Merger +where + K: Hash + Eq, +{ + fn default() -> Self { + Self { + mapping: DashMap::new(), + } + } +} + +impl Merger +where + K: Hash + Eq + Clone + Send + Sync, + T: Send + Sync, +{ + /// Creates a new `Merger` instance. + #[inline] + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Execute and return the value for a given function, making sure that only one + /// operation is in-flight at a given moment. If a duplicate call comes in, + /// that caller will wait until the leader completes and return the same value. + pub fn work(&self, key: K, func: F) -> impl Future + Send + where + F: FnOnce() -> Fut + Send, + Fut: Future + Send, + T: Clone, + { + let cell = self.get_or_create_cell(&key); + let mapping = &self.mapping; + async move { + let result = cell.get_or_init(func()).await.clone(); + // Clean up expired weak reference if present + // Use remove_if to atomically check and remove + mapping.remove_if(&key, |_, weak| weak.upgrade().is_none()); + result + } + } + + /// Gets an existing `OnceCell` for the key, or creates a new one. + fn get_or_create_cell(&self, key: &K) -> Arc> { + // Fast path: check if entry exists and is still valid + if let Some(entry) = self.mapping.get(key) + && let Some(cell) = entry.value().upgrade() + { + return cell; + } + + // Slow path: need to insert or replace expired entry + let cell = Arc::new(OnceCell::new()); + let weak = Arc::downgrade(&cell); + + // Use Entry enum to atomically check-and-return or insert + match self.mapping.entry(key.clone()) { + Occupied(mut entry) => { + // Entry exists - check if still alive + if let Some(existing) = entry.get().upgrade() { + // Another thread's cell is still alive - use it + return existing; + } + // Expired - replace with ours + entry.insert(weak); + } + Vacant(entry) => { + entry.insert(weak); + } + } + + // We inserted our cell, return it + cell + } +} diff --git a/crates/uniflight/tests/work.rs b/crates/uniflight/tests/work.rs new file mode 100644 index 00000000..6033011c --- /dev/null +++ b/crates/uniflight/tests/work.rs @@ -0,0 +1,209 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Integration tests for `UniFlight::work()`. + +use std::{ + sync::{ + Arc, + atomic::{ + AtomicUsize, + Ordering::{AcqRel, Acquire}, + }, + }, + time::Duration, +}; + +use futures_util::{StreamExt, stream::FuturesUnordered}; +use uniflight::Merger; + +fn unreachable_future() -> std::future::Pending { + std::future::pending() +} + +#[tokio::test] +async fn direct_call() { + let group = Merger::new(); + let result = group + .work("key", || async { + tokio::time::sleep(Duration::from_millis(10)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); +} + +#[tokio::test] +async fn parallel_call() { + let call_counter = AtomicUsize::default(); + + let group = Merger::new(); + let futures = FuturesUnordered::new(); + for _ in 0..10 { + futures.push(group.work("key", || async { + tokio::time::sleep(Duration::from_millis(100)).await; + call_counter.fetch_add(1, AcqRel); + "Result".to_string() + })); + } + + assert!(futures.all(|out| async move { out == "Result" }).await); + assert_eq!(call_counter.load(Acquire), 1); +} + +#[tokio::test] +async fn parallel_call_seq_await() { + let call_counter = AtomicUsize::default(); + + let group = Merger::new(); + let mut futures = Vec::new(); + for _ in 0..10 { + futures.push(group.work("key", || async { + tokio::time::sleep(Duration::from_millis(100)).await; + call_counter.fetch_add(1, AcqRel); + "Result".to_string() + })); + } + + for fut in futures { + assert_eq!(fut.await, "Result"); + } + assert_eq!(call_counter.load(Acquire), 1); +} + +#[tokio::test] +async fn call_with_static_str_key() { + let group = Merger::new(); + let result = group + .work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(1)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); +} + +#[tokio::test] +async fn call_with_static_string_key() { + let group = Merger::new(); + let result = group + .work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(1)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); +} + +#[tokio::test] +async fn call_with_custom_key() { + #[derive(Clone, PartialEq, Eq, Hash)] + struct K(i32); + let group = Merger::new(); + let result = group + .work(K(1), || async { + tokio::time::sleep(Duration::from_millis(1)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, "Result"); +} + +#[tokio::test] +async fn late_wait() { + let group = Merger::new(); + let fut_early = group.work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(20)).await; + "Result".to_string() + }); + let fut_late = group.work("key".into(), unreachable_future); + assert_eq!(fut_early.await, "Result"); + tokio::time::sleep(Duration::from_millis(50)).await; + assert_eq!(fut_late.await, "Result"); +} + +#[tokio::test] +async fn cancel() { + let group = Merger::new(); + + // the executer cancelled and the other awaiter will create a new future and execute. + let fut_cancel = group.work("key".to_string(), unreachable_future); + let _ = tokio::time::timeout(Duration::from_millis(10), fut_cancel).await; + let fut_late = group.work("key".to_string(), || async { "Result2".to_string() }); + assert_eq!(fut_late.await, "Result2"); + + // the first executer is slow but not dropped, so the result will be the first ones. + let begin = tokio::time::Instant::now(); + let fut_1 = group.work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(2000)).await; + "Result1".to_string() + }); + let fut_2 = group.work("key".to_string(), unreachable_future); + let (v1, v2) = tokio::join!(fut_1, fut_2); + assert_eq!(v1, "Result1"); + assert_eq!(v2, "Result1"); + assert!(begin.elapsed() > Duration::from_millis(1500)); +} + +#[tokio::test] +async fn leader_panic_in_spawned_task() { + let call_counter = AtomicUsize::default(); + let group: Arc> = Arc::new(Merger::new()); + + // First task will panic in a spawned task (no catch_unwind) + let group_clone = Arc::clone(&group); + let handle = tokio::spawn(async move { + group_clone + .work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(50)).await; + panic!("leader panicked in spawned task"); + #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] + "never".to_string() + }) + .await + }); + + // Give time for the spawned task to register and start + tokio::time::sleep(Duration::from_millis(10)).await; + + // Second task should become the new leader after the first panics + let group_clone = Arc::clone(&group); + let call_counter_ref = &call_counter; + let fut_follower = group_clone.work("key".to_string(), || async { + call_counter_ref.fetch_add(1, AcqRel); + "Result".to_string() + }); + + // Wait for the spawned task to panic + let spawn_result = handle.await; + assert!(spawn_result.is_err()); + + // The follower should succeed - Rust's drop semantics ensure the mutex is released + let result = fut_follower.await; + assert_eq!(result, "Result"); + assert_eq!(call_counter.load(Acquire), 1); +} + +#[tokio::test] +async fn debug_impl() { + let group: Merger = Merger::new(); + + // Test Debug on empty group + let debug_str = format!("{:?}", group); + assert!(debug_str.contains("Merger")); + + // Create a pending work item to populate the mapping with a BroadcastOnce + let fut = group.work("key".to_string(), || async { + tokio::time::sleep(Duration::from_millis(100)).await; + "Result".to_string() + }); + + // Debug should still work with entries in the mapping + let debug_str = format!("{:?}", group); + assert!(debug_str.contains("Merger")); + // The mapping is a DashMap + assert!(debug_str.contains("DashMap")); + + // Complete the work + assert_eq!(fut.await, "Result"); +}