diff --git a/ldk-server-client/src/client.rs b/ldk-server-client/src/client.rs index f810467..8cabe64 100644 --- a/ldk-server-client/src/client.rs +++ b/ldk-server-client/src/client.rs @@ -29,10 +29,10 @@ use ldk_server_protos::api::{ use ldk_server_protos::endpoints::{ BOLT11_RECEIVE_PATH, BOLT11_SEND_PATH, BOLT12_RECEIVE_PATH, BOLT12_SEND_PATH, CLOSE_CHANNEL_PATH, CONNECT_PEER_PATH, DISCONNECT_PEER_PATH, EXPORT_PATHFINDING_SCORES_PATH, - FORCE_CLOSE_CHANNEL_PATH, GET_BALANCES_PATH, GET_NODE_INFO_PATH, GET_PAYMENT_DETAILS_PATH, - LIST_CHANNELS_PATH, LIST_FORWARDED_PAYMENTS_PATH, LIST_PAYMENTS_PATH, ONCHAIN_RECEIVE_PATH, - ONCHAIN_SEND_PATH, OPEN_CHANNEL_PATH, SIGN_MESSAGE_PATH, SPLICE_IN_PATH, SPLICE_OUT_PATH, - SPONTANEOUS_SEND_PATH, UPDATE_CHANNEL_CONFIG_PATH, VERIFY_SIGNATURE_PATH, + FORCE_CLOSE_CHANNEL_PATH, GET_BALANCES_PATH, GET_METRICS_PATH, GET_NODE_INFO_PATH, + GET_PAYMENT_DETAILS_PATH, LIST_CHANNELS_PATH, LIST_FORWARDED_PAYMENTS_PATH, LIST_PAYMENTS_PATH, + ONCHAIN_RECEIVE_PATH, ONCHAIN_SEND_PATH, OPEN_CHANNEL_PATH, SIGN_MESSAGE_PATH, SPLICE_IN_PATH, + SPLICE_OUT_PATH, SPONTANEOUS_SEND_PATH, UPDATE_CHANNEL_CONFIG_PATH, VERIFY_SIGNATURE_PATH, }; use ldk_server_protos::error::{ErrorCode, ErrorResponse}; use prost::Message; @@ -103,6 +103,28 @@ impl LdkServerClient { self.post_request(&request, &url).await } + /// Retrieve the node metrics in Prometheus format. + pub async fn get_metrics(&self) -> Result { + let url = format!("https://{}/{GET_METRICS_PATH}", self.base_url); + let response = self.client.get(&url).send().await.map_err(|e| { + LdkServerError::new(InternalError, format!("HTTP request failed: {}", e)) + })?; + + let status = response.status(); + let text = response.text().await.map_err(|e| { + LdkServerError::new(InternalError, format!("Failed to read response body: {}", e)) + })?; + + if status.is_success() { + Ok(text) + } else { + Err(LdkServerError::new( + InternalError, + format!("Request failed with status {}: {}", status, text), + )) + } + } + /// Retrieves an overview of all known balances. /// For API contract/usage, refer to docs for [`GetBalancesRequest`] and [`GetBalancesResponse`]. pub async fn get_balances( diff --git a/ldk-server-protos/src/endpoints.rs b/ldk-server-protos/src/endpoints.rs index 6833b50..89fb70e 100644 --- a/ldk-server-protos/src/endpoints.rs +++ b/ldk-server-protos/src/endpoints.rs @@ -31,3 +31,4 @@ pub const SPONTANEOUS_SEND_PATH: &str = "SpontaneousSend"; pub const SIGN_MESSAGE_PATH: &str = "SignMessage"; pub const VERIFY_SIGNATURE_PATH: &str = "VerifySignature"; pub const EXPORT_PATHFINDING_SCORES_PATH: &str = "ExportPathfindingScores"; +pub const GET_METRICS_PATH: &str = "metrics"; diff --git a/ldk-server/src/main.rs b/ldk-server/src/main.rs index 5f7b660..04881b4 100644 --- a/ldk-server/src/main.rs +++ b/ldk-server/src/main.rs @@ -50,6 +50,7 @@ use crate::io::persist::{ use crate::service::NodeService; use crate::util::config::{load_config, ArgsConfig, ChainSource}; use crate::util::logger::ServerLogger; +use crate::util::metrics::{Metrics, BUILD_METRICS_INTERVAL}; use crate::util::proto_adapter::{forwarded_payment_to_proto, payment_to_proto}; use crate::util::tls::get_or_generate_tls_config; @@ -256,6 +257,20 @@ fn main() { } }; let event_node = Arc::clone(&node); + + let metrics_node = Arc::clone(&node); + let mut interval = tokio::time::interval(BUILD_METRICS_INTERVAL); + let metrics = Arc::new(Metrics::new()); + let metrics_bg = Arc::clone(&metrics); + let event_metrics = Arc::clone(&metrics); + + runtime.spawn(async move { + loop { + interval.tick().await; + metrics_bg.update_all_pollable_metrics(&metrics_node); + } + }); + let rest_svc_listener = TcpListener::bind(config_file.rest_service_addr) .await .expect("Failed to bind listening port"); @@ -320,6 +335,8 @@ fn main() { &event_node, Arc::clone(&event_publisher), Arc::clone(&paginated_store)).await; + + event_metrics.update_total_successful_payments_count(&event_node); }, Event::PaymentFailed {payment_id, ..} => { let payment_id = payment_id.expect("PaymentId expected for ldk-server >=0.1"); @@ -331,6 +348,8 @@ fn main() { &event_node, Arc::clone(&event_publisher), Arc::clone(&paginated_store)).await; + + event_metrics.update_total_failed_payments_count(&event_node); }, Event::PaymentClaimable {payment_id, ..} => { if let Some(payment_details) = event_node.payment(&payment_id) { @@ -415,7 +434,7 @@ fn main() { res = rest_svc_listener.accept() => { match res { Ok((stream, _)) => { - let node_service = NodeService::new(Arc::clone(&node), Arc::clone(&paginated_store), api_key.clone()); + let node_service = NodeService::new(Arc::clone(&node), Arc::clone(&paginated_store), api_key.clone(), Arc::clone(&metrics)); let acceptor = tls_acceptor.clone(); runtime.spawn(async move { match acceptor.accept(stream).await { diff --git a/ldk-server/src/service.rs b/ldk-server/src/service.rs index cdf4827..c96c54c 100644 --- a/ldk-server/src/service.rs +++ b/ldk-server/src/service.rs @@ -21,10 +21,10 @@ use ldk_node::Node; use ldk_server_protos::endpoints::{ BOLT11_RECEIVE_PATH, BOLT11_SEND_PATH, BOLT12_RECEIVE_PATH, BOLT12_SEND_PATH, CLOSE_CHANNEL_PATH, CONNECT_PEER_PATH, DISCONNECT_PEER_PATH, EXPORT_PATHFINDING_SCORES_PATH, - FORCE_CLOSE_CHANNEL_PATH, GET_BALANCES_PATH, GET_NODE_INFO_PATH, GET_PAYMENT_DETAILS_PATH, - LIST_CHANNELS_PATH, LIST_FORWARDED_PAYMENTS_PATH, LIST_PAYMENTS_PATH, ONCHAIN_RECEIVE_PATH, - ONCHAIN_SEND_PATH, OPEN_CHANNEL_PATH, SIGN_MESSAGE_PATH, SPLICE_IN_PATH, SPLICE_OUT_PATH, - SPONTANEOUS_SEND_PATH, UPDATE_CHANNEL_CONFIG_PATH, VERIFY_SIGNATURE_PATH, + FORCE_CLOSE_CHANNEL_PATH, GET_BALANCES_PATH, GET_METRICS_PATH, GET_NODE_INFO_PATH, + GET_PAYMENT_DETAILS_PATH, LIST_CHANNELS_PATH, LIST_FORWARDED_PAYMENTS_PATH, LIST_PAYMENTS_PATH, + ONCHAIN_RECEIVE_PATH, ONCHAIN_SEND_PATH, OPEN_CHANNEL_PATH, SIGN_MESSAGE_PATH, SPLICE_IN_PATH, + SPLICE_OUT_PATH, SPONTANEOUS_SEND_PATH, UPDATE_CHANNEL_CONFIG_PATH, VERIFY_SIGNATURE_PATH, }; use prost::Message; @@ -53,6 +53,7 @@ use crate::api::spontaneous_send::handle_spontaneous_send_request; use crate::api::update_channel_config::handle_update_channel_config_request; use crate::api::verify_signature::handle_verify_signature_request; use crate::io::persist::paginated_kv_store::PaginatedKVStore; +use crate::util::metrics::Metrics; use crate::util::proto_adapter::to_error_response; // Maximum request body size: 10 MB @@ -64,13 +65,15 @@ pub struct NodeService { node: Arc, paginated_kv_store: Arc, api_key: String, + metrics: Arc, } impl NodeService { pub(crate) fn new( node: Arc, paginated_kv_store: Arc, api_key: String, + metrics: Arc, ) -> Self { - Self { node, paginated_kv_store, api_key } + Self { node, paginated_kv_store, api_key, metrics } } } @@ -154,6 +157,17 @@ impl Service> for NodeService { type Future = Pin> + Send>>; fn call(&self, req: Request) -> Self::Future { + // Handle metrics endpoint separately to bypass auth and return plain text + if req.uri().path().len() > 1 && &req.uri().path()[1..] == GET_METRICS_PATH { + let metrics = Arc::clone(&self.metrics); + return Box::pin(async move { + Ok(Response::builder() + .header("Content-Type", "text/plain") + .body(Full::new(Bytes::from(metrics.gather_metrics()))) + .unwrap()) + }); + } + // Extract auth params from headers (validation happens after body is read) let auth_params = match extract_auth_params(&req) { Ok(params) => params, diff --git a/ldk-server/src/util/metrics.rs b/ldk-server/src/util/metrics.rs new file mode 100644 index 0000000..487eb61 --- /dev/null +++ b/ldk-server/src/util/metrics.rs @@ -0,0 +1,289 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::time::Duration; + +use ldk_node::payment::PaymentStatus; +use ldk_node::Node; + +pub const BUILD_METRICS_INTERVAL: Duration = Duration::from_secs(60); + +pub struct Metrics { + pub total_peers_count: AtomicI64, + pub total_payments_count: AtomicI64, + pub total_successful_payments_count: AtomicI64, + pub total_failed_payments_count: AtomicI64, + pub total_channels_count: AtomicI64, + pub total_public_channels_count: AtomicI64, + pub total_private_channels_count: AtomicI64, + pub total_onchain_balance_sats: AtomicU64, + pub spendable_onchain_balance_sats: AtomicU64, + pub total_anchor_channels_reserve_sats: AtomicU64, + pub total_lightning_balance_sats: AtomicU64, +} + +impl Metrics { + pub fn new() -> Self { + Self { + total_peers_count: AtomicI64::new(0), + total_payments_count: AtomicI64::new(0), + total_successful_payments_count: AtomicI64::new(0), + total_failed_payments_count: AtomicI64::new(0), + total_channels_count: AtomicI64::new(0), + total_public_channels_count: AtomicI64::new(0), + total_private_channels_count: AtomicI64::new(0), + total_onchain_balance_sats: AtomicU64::new(0), + spendable_onchain_balance_sats: AtomicU64::new(0), + total_anchor_channels_reserve_sats: AtomicU64::new(0), + total_lightning_balance_sats: AtomicU64::new(0), + } + } + + fn update_peer_count(&self, node: &Node) { + let total_peers_count = node.list_peers().len() as i64; + self.total_peers_count.store(total_peers_count, Ordering::Relaxed); + } + + fn update_total_payments_count(&self, node: &Node) { + let total_payments_count = node.list_payments().len() as i64; + self.total_payments_count.store(total_payments_count, Ordering::Relaxed); + } + + pub fn update_total_successful_payments_count(&self, node: &Node) { + let successful_payments_count = node + .list_payments() + .iter() + .filter(|payment_details| payment_details.status == PaymentStatus::Succeeded) + .count() as i64; + self.total_successful_payments_count.store(successful_payments_count, Ordering::Relaxed); + } + + pub fn update_total_failed_payments_count(&self, node: &Node) { + let failed_payments_count = node + .list_payments() + .iter() + .filter(|payment_details| payment_details.status == PaymentStatus::Failed) + .count() as i64; + self.total_failed_payments_count.store(failed_payments_count, Ordering::Relaxed); + } + + fn update_total_channels_count(&self, node: &Node) { + let total_channels_count = node.list_channels().len() as i64; + self.total_channels_count.store(total_channels_count, Ordering::Relaxed); + } + + fn update_total_public_channels_count(&self, node: &Node) { + let total_public_channels_count = node + .list_channels() + .iter() + .filter(|channel_details| channel_details.is_announced) + .count() as i64; + self.total_public_channels_count.store(total_public_channels_count, Ordering::Relaxed); + } + + fn update_total_private_channels_count(&self, node: &Node) { + let total_private_channels_count = node + .list_channels() + .iter() + .filter(|channel_details| !channel_details.is_announced) + .count() as i64; + self.total_private_channels_count.store(total_private_channels_count, Ordering::Relaxed); + } + + fn update_all_balances(&self, node: &Node) { + let all_balances = node.list_balances(); + self.total_onchain_balance_sats + .store(all_balances.total_onchain_balance_sats, Ordering::Relaxed); + + self.spendable_onchain_balance_sats + .store(all_balances.spendable_onchain_balance_sats, Ordering::Relaxed); + + self.total_anchor_channels_reserve_sats + .store(all_balances.total_anchor_channels_reserve_sats, Ordering::Relaxed); + + self.total_lightning_balance_sats + .store(all_balances.total_lightning_balance_sats, Ordering::Relaxed); + } + + pub fn update_all_pollable_metrics(&self, node: &Node) { + self.update_peer_count(node); + self.update_total_payments_count(node); + self.update_total_successful_payments_count(node); + self.update_total_failed_payments_count(node); + self.update_total_channels_count(node); + self.update_total_public_channels_count(node); + self.update_total_private_channels_count(node); + self.update_all_balances(node); + } + + pub fn gather_metrics(&self) -> String { + let mut buffer = String::new(); + + fn format_metric( + buffer: &mut String, name: &str, help: &str, metric_type: &str, + value: impl std::fmt::Display, + ) { + use std::fmt::Write; + let _ = writeln!(buffer, "# HELP {} {}", name, help); + let _ = writeln!(buffer, "# TYPE {} {}", name, metric_type); + let _ = writeln!(buffer, "{} {}", name, value); + } + + format_metric( + &mut buffer, + "ldk_server_total_peers_count", + "Total number of peers", + "gauge", + self.total_peers_count.load(Ordering::Relaxed), + ); + + format_metric( + &mut buffer, + "ldk_server_total_payments_count", + "Total number of payments", + "counter", + self.total_payments_count.load(Ordering::Relaxed), + ); + + format_metric( + &mut buffer, + "ldk_server_total_successful_payments_count", + "Total number of successful payments", + "counter", + self.total_successful_payments_count.load(Ordering::Relaxed), + ); + + format_metric( + &mut buffer, + "ldk_server_total_failed_payments_count", + "Total number of failed payments", + "counter", + self.total_failed_payments_count.load(Ordering::Relaxed), + ); + + format_metric( + &mut buffer, + "ldk_server_total_channels_count", + "Total number of channels", + "counter", + self.total_channels_count.load(Ordering::Relaxed), + ); + + format_metric( + &mut buffer, + "ldk_server_total_public_channels_count", + "Total number of public channels", + "counter", + self.total_public_channels_count.load(Ordering::Relaxed), + ); + + format_metric( + &mut buffer, + "ldk_server_total_private_channels_count", + "Total number of private channels", + "counter", + self.total_private_channels_count.load(Ordering::Relaxed), + ); + + format_metric( + &mut buffer, + "ldk_server_total_onchain_balance_sats", + "Total onchain balance in sats", + "gauge", + self.total_onchain_balance_sats.load(Ordering::Relaxed), + ); + + format_metric( + &mut buffer, + "ldk_server_spendable_onchain_balance_sats", + "Spendable onchain balance in sats", + "gauge", + self.spendable_onchain_balance_sats.load(Ordering::Relaxed), + ); + + format_metric( + &mut buffer, + "ldk_server_total_anchor_channels_reserve_sats", + "Total anchor channels reserve in sats", + "gauge", + self.total_anchor_channels_reserve_sats.load(Ordering::Relaxed), + ); + + format_metric( + &mut buffer, + "ldk_server_total_lightning_balance_sats", + "Total lightning balance in sats", + "gauge", + self.total_lightning_balance_sats.load(Ordering::Relaxed), + ); + + buffer + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_initial_metrics_values() { + let metrics = Metrics::new(); + let result = metrics.gather_metrics(); + + // Check that all metrics are present and empty + assert!(result.contains("ldk_server_total_peers_count 0")); + assert!(result.contains("ldk_server_total_payments_count 0")); + assert!(result.contains("ldk_server_total_successful_payments_count 0")); + assert!(result.contains("ldk_server_total_failed_payments_count 0")); + assert!(result.contains("ldk_server_total_channels_count 0")); + assert!(result.contains("ldk_server_total_public_channels_count 0")); + assert!(result.contains("ldk_server_total_private_channels_count 0")); + assert!(result.contains("ldk_server_total_onchain_balance_sats 0")); + assert!(result.contains("ldk_server_spendable_onchain_balance_sats 0")); + assert!(result.contains("ldk_server_total_anchor_channels_reserve_sats 0")); + assert!(result.contains("ldk_server_total_lightning_balance_sats 0")); + } + + #[test] + fn test_metrics_update_and_gather() { + let metrics = Metrics::new(); + + // Manually update metrics to simulate node activity + metrics.total_peers_count.store(5, Ordering::Relaxed); + metrics.total_payments_count.store(10, Ordering::Relaxed); + metrics.total_successful_payments_count.store(8, Ordering::Relaxed); + metrics.total_failed_payments_count.store(2, Ordering::Relaxed); + metrics.total_channels_count.store(3, Ordering::Relaxed); + metrics.total_public_channels_count.store(1, Ordering::Relaxed); + metrics.total_private_channels_count.store(2, Ordering::Relaxed); + metrics.total_onchain_balance_sats.store(100_000, Ordering::Relaxed); + metrics.spendable_onchain_balance_sats.store(50_000, Ordering::Relaxed); + metrics.total_anchor_channels_reserve_sats.store(1_000, Ordering::Relaxed); + metrics.total_lightning_balance_sats.store(250_000, Ordering::Relaxed); + + let result = metrics.gather_metrics(); + + // Check that output contains updated values and correct Prometheus format + assert!(result.contains("# HELP ldk_server_total_peers_count Total number of peers")); + assert!(result.contains("# TYPE ldk_server_total_peers_count gauge")); + assert!(result.contains("ldk_server_total_peers_count 5")); + + assert!(result.contains("ldk_server_total_payments_count 10")); + assert!(result.contains("ldk_server_total_successful_payments_count 8")); + assert!(result.contains("ldk_server_total_failed_payments_count 2")); + assert!(result.contains("ldk_server_total_channels_count 3")); + assert!(result.contains("ldk_server_total_public_channels_count 1")); + assert!(result.contains("ldk_server_total_private_channels_count 2")); + assert!(result.contains("ldk_server_total_onchain_balance_sats 100000")); + assert!(result.contains("ldk_server_spendable_onchain_balance_sats 50000")); + assert!(result.contains("ldk_server_total_anchor_channels_reserve_sats 1000")); + assert!(result.contains("ldk_server_total_lightning_balance_sats 250000")); + } +} diff --git a/ldk-server/src/util/mod.rs b/ldk-server/src/util/mod.rs index 3662b12..1d22bb9 100644 --- a/ldk-server/src/util/mod.rs +++ b/ldk-server/src/util/mod.rs @@ -9,5 +9,6 @@ pub(crate) mod config; pub(crate) mod logger; +pub(crate) mod metrics; pub(crate) mod proto_adapter; pub(crate) mod tls;