Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,13 +1038,25 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
let manager =
<(BlockHash, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager");
let res = (manager.1, chain_monitor.clone());
let expected_status = *mon_style[node_id as usize].borrow();
*chain_monitor.persister.update_ret.lock().unwrap() = expected_status.clone();
for (channel_id, mon) in monitors.drain() {
let monitor_id = mon.get_latest_update_id();
assert_eq!(
chain_monitor.chain_monitor.watch_channel(channel_id, mon),
Ok(ChannelMonitorUpdateStatus::Completed)
Ok(expected_status.clone())
);
// When persistence returns InProgress, the real ChainMonitor tracks
// the initial update_id as pending. We must mirror this in the
// TestChainMonitor's latest_monitors so that
// complete_all_monitor_updates can drain and complete it later.
if expected_status == chain::ChannelMonitorUpdateStatus::InProgress {
let mut map = chain_monitor.latest_monitors.lock().unwrap();
if let Some(state) = map.get_mut(&channel_id) {
state.pending_monitors.push((monitor_id, state.persisted_monitor.clone()));
}
}
}
*chain_monitor.persister.update_ret.lock().unwrap() = *mon_style[node_id as usize].borrow();
res
};

Expand Down
33 changes: 33 additions & 0 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ pub struct ChainMonitor<
/// Messages to send to the peer. This is currently used to distribute PeerStorage to channel partners.
pending_send_only_events: Mutex<Vec<MessageSendEvent>>,

/// We only support using one of [`ChannelMonitorUpdateStatus::InProgress`] and
/// [`ChannelMonitorUpdateStatus::Completed`] without restarting. We enforce this in non-test
/// builds by storing which one is in use (0 = unset, 1 = InProgress, 2 = Completed).
#[cfg(not(any(test, feature = "_externalize_tests")))]
monitor_update_type: AtomicUsize,

#[cfg(peer_storage)]
our_peerstorage_encryption_key: PeerStorageKey,
}
Expand Down Expand Up @@ -412,6 +418,8 @@ where
event_notifier: Arc::clone(&event_notifier),
persister: AsyncPersister { persister, event_notifier },
pending_send_only_events: Mutex::new(Vec::new()),
#[cfg(not(any(test, feature = "_externalize_tests")))]
monitor_update_type: AtomicUsize::new(0),
#[cfg(peer_storage)]
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
}
Expand Down Expand Up @@ -617,11 +625,32 @@ where
highest_chain_height: AtomicUsize::new(0),
event_notifier: Arc::new(Notifier::new()),
pending_send_only_events: Mutex::new(Vec::new()),
#[cfg(not(any(test, feature = "_externalize_tests")))]
monitor_update_type: AtomicUsize::new(0),
#[cfg(peer_storage)]
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
}
}

#[cfg(not(any(test, feature = "_externalize_tests")))]
fn check_monitor_update_type(
monitor_update_type: &AtomicUsize, persist_res: &ChannelMonitorUpdateStatus,
) {
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
if monitor_update_type.swap(1, Ordering::Relaxed) == 2 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
},
ChannelMonitorUpdateStatus::Completed => {
if monitor_update_type.swap(2, Ordering::Relaxed) == 1 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
},
ChannelMonitorUpdateStatus::UnrecoverableError => {},
}
}

/// Gets the balances in the contained [`ChannelMonitor`]s which are claimable on-chain or
/// claims which are awaiting confirmation.
///
Expand Down Expand Up @@ -1285,6 +1314,8 @@ where
let update_id = monitor.get_latest_update_id();
let mut pending_monitor_updates = Vec::new();
let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor);
#[cfg(not(any(test, feature = "_externalize_tests")))]
Self::check_monitor_update_type(&self.monitor_update_type, &persist_res);
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
log_info!(logger, "Persistence of new ChannelMonitor in progress",);
Expand Down Expand Up @@ -1367,6 +1398,8 @@ where
monitor,
)
};
#[cfg(not(any(test, feature = "_externalize_tests")))]
Self::check_monitor_update_type(&self.monitor_update_type, &persist_res);
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
pending_monitor_updates.push(update_id);
Expand Down
25 changes: 1 addition & 24 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2779,13 +2779,6 @@ pub struct ChannelManager<
#[cfg(any(test, feature = "_test_utils"))]
pub(super) per_peer_state: FairRwLock<HashMap<PublicKey, Mutex<PeerState<SP>>>>,

/// We only support using one of [`ChannelMonitorUpdateStatus::InProgress`] and
/// [`ChannelMonitorUpdateStatus::Completed`] without restarting. Because the API does not
/// otherwise directly enforce this, we enforce it in non-test builds here by storing which one
/// is in use.
#[cfg(not(any(test, feature = "_externalize_tests")))]
monitor_update_type: AtomicUsize,

/// The set of events which we need to give to the user to handle. In some cases an event may
/// require some further action after the user handles it (currently only blocking a monitor
/// update from being handed to the user to ensure the included changes to the channel state
Expand Down Expand Up @@ -3527,9 +3520,6 @@ impl<

per_peer_state: FairRwLock::new(new_hash_map()),

#[cfg(not(any(test, feature = "_externalize_tests")))]
monitor_update_type: AtomicUsize::new(0),

pending_events: Mutex::new(VecDeque::new()),
pending_events_processor: AtomicBool::new(false),
pending_htlc_forwards_processor: AtomicBool::new(false),
Expand Down Expand Up @@ -10006,23 +9996,13 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
panic!("{}", err_str);
},
ChannelMonitorUpdateStatus::InProgress => {
#[cfg(not(any(test, feature = "_externalize_tests")))]
if self.monitor_update_type.swap(1, Ordering::Relaxed) == 2 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
log_debug!(
logger,
"ChannelMonitor update in flight, holding messages until the update completes.",
);
false
},
ChannelMonitorUpdateStatus::Completed => {
#[cfg(not(any(test, feature = "_externalize_tests")))]
if self.monitor_update_type.swap(2, Ordering::Relaxed) == 1 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
true
},
ChannelMonitorUpdateStatus::Completed => true,
}
}

Expand Down Expand Up @@ -19550,9 +19530,6 @@ impl<

per_peer_state: FairRwLock::new(per_peer_state),

#[cfg(not(any(test, feature = "_externalize_tests")))]
monitor_update_type: AtomicUsize::new(0),

pending_events: Mutex::new(pending_events_read),
pending_events_processor: AtomicBool::new(false),
pending_htlc_forwards_processor: AtomicBool::new(false),
Expand Down
Loading