Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 24 additions & 40 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -873,26 +873,6 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
let broadcast_c = Arc::new(TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) });
let router = FuzzRouter {};

// Read initial monitor styles from fuzz input (1 byte: 2 bits per node)
let initial_mon_styles = if !data.is_empty() { data[0] } else { 0 };
let mon_style = [
RefCell::new(if initial_mon_styles & 0b01 != 0 {
ChannelMonitorUpdateStatus::InProgress
} else {
ChannelMonitorUpdateStatus::Completed
}),
RefCell::new(if initial_mon_styles & 0b10 != 0 {
ChannelMonitorUpdateStatus::InProgress
} else {
ChannelMonitorUpdateStatus::Completed
}),
RefCell::new(if initial_mon_styles & 0b100 != 0 {
ChannelMonitorUpdateStatus::InProgress
} else {
ChannelMonitorUpdateStatus::Completed
}),
];

let mut chain_state = ChainState::new();
let mut node_height_a: u32 = 0;
let mut node_height_b: u32 = 0;
Expand All @@ -917,7 +897,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
logger.clone(),
$fee_estimator.clone(),
Arc::new(TestPersister {
update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()),
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
}),
Arc::clone(&keys_manager),
));
Expand Down Expand Up @@ -982,6 +962,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = false;
}

let prev_update_ret = *old_monitors.persister.update_ret.lock().unwrap();
let mut monitors = new_hash_map();
let mut old_monitors = old_monitors.latest_monitors.lock().unwrap();
for (channel_id, mut prev_state) in old_monitors.drain() {
Expand Down Expand Up @@ -1044,7 +1025,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
Ok(ChannelMonitorUpdateStatus::Completed)
);
}
*chain_monitor.persister.update_ret.lock().unwrap() = *mon_style[node_id as usize].borrow();
*chain_monitor.persister.update_ret.lock().unwrap() = prev_update_ret;
res
};

Expand Down Expand Up @@ -1376,7 +1357,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
}};
}

let mut read_pos = 1; // First byte was consumed for initial mon_style
let mut read_pos = 0;
macro_rules! get_slice {
($len: expr) => {{
let slice_len = $len as usize;
Expand Down Expand Up @@ -2102,23 +2083,26 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
// In general, we keep related message groups close together in binary form, allowing
// bit-twiddling mutations to have similar effects. This is probably overkill, but no
// harm in doing so.
0x00 => {
*mon_style[0].borrow_mut() = ChannelMonitorUpdateStatus::InProgress;
},
0x01 => {
*mon_style[1].borrow_mut() = ChannelMonitorUpdateStatus::InProgress;
},
0x02 => {
*mon_style[2].borrow_mut() = ChannelMonitorUpdateStatus::InProgress;
},
0x04 => {
*mon_style[0].borrow_mut() = ChannelMonitorUpdateStatus::Completed;
},
0x05 => {
*mon_style[1].borrow_mut() = ChannelMonitorUpdateStatus::Completed;
},
0x06 => {
*mon_style[2].borrow_mut() = ChannelMonitorUpdateStatus::Completed;
0x00 | 0x01 | 0x02 => {
let monitor = match v {
0x00 => &monitor_a,
0x01 => &monitor_b,
_ => &monitor_c,
};
let mut ret = monitor.persister.update_ret.lock().unwrap();
if *ret == ChannelMonitorUpdateStatus::Completed {
*ret = ChannelMonitorUpdateStatus::InProgress;
} else {
let has_pending = monitor
.latest_monitors
.lock()
.unwrap()
.values()
.any(|s| !s.pending_monitors.is_empty());
if !has_pending {
*ret = ChannelMonitorUpdateStatus::Completed;
}
}
},

0x08 => {
Expand Down
1 change: 1 addition & 0 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6817,6 +6817,7 @@ mod tests {
let legacy_cfg = test_legacy_channel_config();
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[Some(legacy_cfg.clone()), Some(legacy_cfg.clone()), Some(legacy_cfg)]);
let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
nodes[1].disable_monitor_completeness_assertion();
let channel = create_announced_chan_between_nodes(&nodes, 0, 1);
create_announced_chan_between_nodes(&nodes, 1, 2);

Expand Down
11 changes: 3 additions & 8 deletions lightning/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ pub enum ChannelMonitorUpdateStatus {
/// This includes performing any `fsync()` calls required to ensure the update is guaranteed to
/// be available on restart even if the application crashes.
///
/// If you return this variant, you cannot later return [`InProgress`] from the same instance of
/// [`Persist`]/[`Watch`] without first restarting.
/// A [`Watch`] implementation must not return this for a channel update if there are still
/// pending [`InProgress`] updates for that channel. That is, an update can only be considered
/// complete once all prior updates have also completed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, thinking about this a bit more there's a weird race. Async updates are "completed" only after release_pending_monitor_events returns the completed update, but also only after it has been processed, which may be some arbitrary and indeterminate amount of time later.

For the ChainMonitor return case, this is fine (though we need to update ChainMonitor - we should probably be returning InProgress for any new update that comes after one for which we have to return a spurious InProgress. Do you already intend to do that in a followup? should we not do it here?). But for the general API its pretty weird - you can in theory return a Completed after returning InProgress as long as you want some indeterminate and arbitrary amount of time, so in practice you can't...

We could fix this by adding some mutual exclusion in channelmanager.rs where we wait before processing Completed monitor updates until after any pending MonitorEvents are processed - this should be basically zero cost as ~no one is going to be using mixed-async-sync persistence in practice so we'll rarely if ever have any contention.

On the flip side, we could say that implementations are allowed to flip from Completed to InProgress for a channel, but never back (without a restart). Its more complicated to document, but it captures what we need to allow the ChainMonitor behavior.

Copy link
Contributor Author

@joostjager joostjager Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implementations are allowed to flip from Completed to InProgress for a channel, but never back

This sounds like the easier way. There is no need to support mixing sync and async for Persist implementations. I do wonder now whether my initial version #4435 wasn't just sufficient. Moving the mixed mode check to the chainmonitor level and keeping everything else the same.

Fuzzer run was planned for tonight.

Copy link
Contributor Author

@joostjager joostjager Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The per-channel mode latch turned out to be not ideal because it required either a new lock on ChannelManager or threading the latch through deeply nested call chains via an already-held lock, all to accommodate ChainMonitor internally using InProgress creatively to signal a monitor update failure rather than async persistence.

Perhaps it is better to make this case explicit in the API? #4445

You were definitely right about the fuzzer. Woke up to lots of errors.

///
/// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress
/// [`Persist`]: chainmonitor::Persist
Completed,
/// Indicates that the update will happen asynchronously in the background or that a transient
/// failure occurred which is being retried in the background and will eventually complete.
Expand All @@ -263,12 +263,7 @@ pub enum ChannelMonitorUpdateStatus {
/// reliable, this feature is considered beta, and a handful of edge-cases remain. Until the
/// remaining cases are fixed, in rare cases, *using this feature may lead to funds loss*.
///
/// If you return this variant, you cannot later return [`Completed`] from the same instance of
/// [`Persist`]/[`Watch`] without first restarting.
///
/// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress
/// [`Completed`]: ChannelMonitorUpdateStatus::Completed
/// [`Persist`]: chainmonitor::Persist
InProgress,
/// Indicates that an update has failed and will not complete at any point in the future.
///
Expand Down
6 changes: 6 additions & 0 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) {
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
nodes[0].disable_monitor_completeness_assertion();

let node_a_id = nodes[0].node.get_our_node_id();
let node_b_id = nodes[1].node.get_our_node_id();
Expand Down Expand Up @@ -316,6 +317,7 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) {
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
nodes[0].disable_monitor_completeness_assertion();

let node_a_id = nodes[0].node.get_our_node_id();
let node_b_id = nodes[1].node.get_our_node_id();
Expand Down Expand Up @@ -969,6 +971,7 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) {
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
nodes[1].disable_monitor_completeness_assertion();

let node_a_id = nodes[0].node.get_our_node_id();
let node_b_id = nodes[1].node.get_our_node_id();
Expand Down Expand Up @@ -1500,6 +1503,7 @@ fn claim_while_disconnected_monitor_update_fail() {
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
nodes[1].disable_monitor_completeness_assertion();

let node_a_id = nodes[0].node.get_our_node_id();
let node_b_id = nodes[1].node.get_our_node_id();
Expand Down Expand Up @@ -1727,6 +1731,7 @@ fn first_message_on_recv_ordering() {
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
nodes[1].disable_monitor_completeness_assertion();

let node_a_id = nodes[0].node.get_our_node_id();
let node_b_id = nodes[1].node.get_our_node_id();
Expand Down Expand Up @@ -3849,6 +3854,7 @@ fn do_test_durable_preimages_on_closed_channel(
// Now reload node B
let manager_b = nodes[1].node.encode();
reload_node!(nodes[1], &manager_b, &[&mon_ab, &mon_bc], persister, chain_mon, node_b_reload);
nodes[1].disable_monitor_completeness_assertion();

nodes[0].node.peer_disconnected(node_b_id);
nodes[2].node.peer_disconnected(node_b_id);
Expand Down
41 changes: 20 additions & 21 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2792,12 +2792,12 @@ pub struct ChannelManager<
#[cfg(any(test, feature = "_test_utils"))]
pub(super) per_peer_state: FairRwLock<HashMap<PublicKey, Mutex<PeerState<SP>>>>,

/// We only support using one of [`ChannelMonitorUpdateStatus::InProgress`] and
/// [`ChannelMonitorUpdateStatus::Completed`] without restarting. Because the API does not
/// otherwise directly enforce this, we enforce it in non-test builds here by storing which one
/// is in use.
#[cfg(not(any(test, feature = "_externalize_tests")))]
monitor_update_type: AtomicUsize,
/// When set, disables the debug assertion that `Watch::update_channel` must not return
/// `Completed` while prior updates are still `InProgress`. Some legacy tests switch the
/// persister between `InProgress` and `Completed` mid-flight, which violates this contract
/// but is otherwise harmless in a test context.
#[cfg(test)]
pub(crate) skip_monitor_update_assertion: AtomicBool,

/// The set of events which we need to give to the user to handle. In some cases an event may
/// require some further action after the user handles it (currently only blocking a monitor
Expand Down Expand Up @@ -3540,8 +3540,8 @@ impl<

per_peer_state: FairRwLock::new(new_hash_map()),

#[cfg(not(any(test, feature = "_externalize_tests")))]
monitor_update_type: AtomicUsize::new(0),
#[cfg(test)]
skip_monitor_update_assertion: AtomicBool::new(false),

pending_events: Mutex::new(VecDeque::new()),
pending_events_processor: AtomicBool::new(false),
Expand Down Expand Up @@ -9965,6 +9965,15 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
if update_completed {
let _ = in_flight_updates.remove(update_idx);
}
// A Watch implementation must not return Completed while prior updates are
// still InProgress, as this would violate the async persistence contract.
#[cfg(test)]
let skip_check = self.skip_monitor_update_assertion.load(Ordering::Relaxed);
#[cfg(not(test))]
let skip_check = false;
if !skip_check && update_completed && !in_flight_updates.is_empty() {
panic!("Watch::update_channel returned Completed while prior updates are still InProgress");
}
(update_completed, update_completed && in_flight_updates.is_empty())
} else {
// We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we
Expand Down Expand Up @@ -10030,23 +10039,13 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
panic!("{}", err_str);
},
ChannelMonitorUpdateStatus::InProgress => {
#[cfg(not(any(test, feature = "_externalize_tests")))]
if self.monitor_update_type.swap(1, Ordering::Relaxed) == 2 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
log_debug!(
logger,
"ChannelMonitor update in flight, holding messages until the update completes.",
);
false
},
ChannelMonitorUpdateStatus::Completed => {
#[cfg(not(any(test, feature = "_externalize_tests")))]
if self.monitor_update_type.swap(2, Ordering::Relaxed) == 1 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
true
},
ChannelMonitorUpdateStatus::Completed => true,
}
}

Expand Down Expand Up @@ -19553,8 +19552,8 @@ impl<

per_peer_state: FairRwLock::new(per_peer_state),

#[cfg(not(any(test, feature = "_externalize_tests")))]
monitor_update_type: AtomicUsize::new(0),
#[cfg(test)]
skip_monitor_update_assertion: AtomicBool::new(false),

pending_events: Mutex::new(pending_events_read),
pending_events_processor: AtomicBool::new(false),
Expand Down
8 changes: 8 additions & 0 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,14 @@ impl<'a, 'b, 'c> Node<'a, 'b, 'c> {
self.node.init_features() | self.onion_messenger.provided_init_features(peer_node_id)
})
}

/// Disables the debug assertion that `Watch::update_channel` must not return `Completed`
/// while prior updates are still `InProgress`. Some legacy tests switch the persister between
/// modes mid-flight, which violates this contract but is otherwise harmless.
#[cfg(test)]
pub fn disable_monitor_completeness_assertion(&self) {
self.node.skip_monitor_update_assertion.store(true, core::sync::atomic::Ordering::Relaxed);
}
}

impl<'a, 'b, 'c> std::panic::UnwindSafe for Node<'a, 'b, 'c> {}
Expand Down
1 change: 1 addition & 0 deletions lightning/src/ln/monitor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3384,6 +3384,7 @@ fn test_claim_event_never_handled() {
let chan_0_monitor_serialized = get_monitor!(nodes[1], chan.2).encode();
let mons = &[&chan_0_monitor_serialized[..]];
reload_node!(nodes[1], &init_node_ser, mons, persister, new_chain_mon, nodes_1_reload);
nodes[1].disable_monitor_completeness_assertion();

expect_payment_claimed!(nodes[1], payment_hash_a, 1_000_000);
// The reload logic spuriously generates a redundant payment preimage-containing
Expand Down
3 changes: 3 additions & 0 deletions lightning/src/ln/reload_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,12 +823,14 @@ fn do_test_partial_claim_before_restart(persist_both_monitors: bool, double_rest

// Now restart nodes[3].
reload_node!(nodes[3], original_manager.clone(), &[&updated_monitor.0, &original_monitor.0], persist_d_1, chain_d_1, node_d_1);
nodes[3].disable_monitor_completeness_assertion();

if double_restart {
// Previously, we had a bug where we'd fail to reload if we re-persist the `ChannelManager`
// without updating any `ChannelMonitor`s as we'd fail to double-initiate the claim replay.
// We test that here ensuring that we can reload again.
reload_node!(nodes[3], node_d_1.encode(), &[&updated_monitor.0, &original_monitor.0], persist_d_2, chain_d_2, node_d_2);
nodes[3].disable_monitor_completeness_assertion();
}

// Until the startup background events are processed (in `get_and_clear_pending_events`,
Expand Down Expand Up @@ -2216,6 +2218,7 @@ fn test_reload_with_mpp_claims_on_same_channel() {
nodes_1_deserialized,
Some(true)
);
nodes[1].disable_monitor_completeness_assertion();

// When the claims are reconstructed during reload, PaymentForwarded events are regenerated.
let events = nodes[1].node.get_and_clear_pending_events();
Expand Down
Loading