diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index a5ae5744ad0..f118759d3c6 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -35,20 +35,18 @@ type ShutdownReply = oneshot::Sender; /// This exists to avoid holding a transaction lock while /// preparing the [TxData] for processing by the [Durability] layer. pub struct DurabilityWorker { + database: Identity, request_tx: UnboundedSender, shutdown: Sender, durability: Arc, runtime: runtime::Handle, } -/// Those who run seem to have all the fun... 🎶 -const HUNG_UP: &str = "durability actor hung up / panicked"; - impl DurabilityWorker { /// Create a new [`DurabilityWorker`] using the given `durability` policy. /// /// Background tasks will be spawned onto to provided tokio `runtime`. - pub fn new(durability: Arc, runtime: runtime::Handle) -> Self { + pub fn new(database: Identity, durability: Arc, runtime: runtime::Handle) -> Self { let (request_tx, request_rx) = unbounded_channel(); let (shutdown_tx, shutdown_rx) = channel(1); @@ -61,6 +59,7 @@ impl DurabilityWorker { tokio::spawn(actor.run()); Self { + database, request_tx, shutdown: shutdown_tx, durability, @@ -94,7 +93,7 @@ impl DurabilityWorker { reducer_context, tx_data: tx_data.clone(), }) - .expect(HUNG_UP); + .unwrap_or_else(|_| panic!("durability actor vanished database={}", self.database)); } /// Get the [`DurableOffset`] of this database. @@ -312,7 +311,7 @@ mod tests { #[tokio::test] async fn shutdown_waits_until_durable() { let durability = Arc::new(CountingDurability::default()); - let worker = DurabilityWorker::new(durability.clone(), runtime::Handle::current()); + let worker = DurabilityWorker::new(Identity::ONE, durability.clone(), runtime::Handle::current()); for i in 0..=10 { let mut txdata = TxData::default(); diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 744ad322338..b3064db828b 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -156,7 +156,7 @@ impl RelationalDB { let (durability, disk_size_fn, snapshot_worker, rt) = Persistence::unzip(persistence); let durability = durability .zip(rt) - .map(|(durability, rt)| DurabilityWorker::new(durability, rt)); + .map(|(durability, rt)| DurabilityWorker::new(database_identity, durability, rt)); Self { inner,