From 3475010a3f7b27b2cc9e41708af4439687030491 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 10 Dec 2025 16:13:02 +0800 Subject: [PATCH 1/4] refactor: `vacuum temporary files` now cleans inactive temp table data too --- .../interpreter_vacuum_temporary_files.rs | 11 +- .../fuse_vacuum_temporary_table.rs | 124 ++++++++++-------- .../storages/fuse/src/table_functions/mod.rs | 1 + 3 files changed, 82 insertions(+), 54 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs b/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs index 78c4757b4c6e4..d733d1119654c 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs @@ -21,6 +21,7 @@ use databend_common_expression::FromData; use databend_common_license::license::Feature::Vacuum; use databend_common_license::license_manager::LicenseManagerSwitch; use databend_common_sql::plans::VacuumTemporaryFilesPlan; +use databend_common_storages_fuse::table_functions::vacuum_inactive_temp_tables; use databend_enterprise_vacuum_handler::get_vacuum_handler; use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions; @@ -67,8 +68,16 @@ impl Interpreter for VacuumTemporaryFilesInterpreter { ) .await?; + let table_ctx: Arc = self.ctx.clone(); + let session_limit = self + .plan + .limit + .map(|limit| limit.saturating_sub(removed_files as u64)); + let cleaned_sessions = vacuum_inactive_temp_tables(&table_ctx, session_limit).await? as u64; + let total_cleaned = removed_files as u64 + cleaned_sessions; + PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ - UInt64Type::from_data(vec![removed_files as u64]), + UInt64Type::from_data(vec![total_cleaned]), ])]) } } diff --git a/src/query/storages/fuse/src/table_functions/fuse_vacuum_temporary_table.rs b/src/query/storages/fuse/src/table_functions/fuse_vacuum_temporary_table.rs index 4f9027d78e060..1f5a868fa6405 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_vacuum_temporary_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_vacuum_temporary_table.rs @@ -37,6 +37,76 @@ use log::info; use crate::sessions::TableContext; use crate::table_functions::SimpleTableFunc; +#[async_backtrace::framed] +pub async fn vacuum_inactive_temp_tables( + ctx: &Arc, + limit: Option, +) -> Result { + let op = DataOperator::instance().operator(); + let mut lister = op + .lister_with(TEMP_TABLE_STORAGE_PREFIX) + .recursive(true) + .await?; + + let client_session_mgr = UserApiProvider::instance().client_session_api(&ctx.get_tenant()); + let mut user_session_ids = HashSet::new(); + let mut inactive_user_session_ids = Vec::new(); + let session_limit = limit.unwrap_or(u64::MAX) as usize; + + if session_limit == 0 { + return Ok(0); + } + + while let Some(entry) = lister.try_next().await? { + if entry.metadata().is_dir() { + continue; + } + let path = entry.path(); + let parts: Vec<_> = path.split('/').collect(); + if parts.len() < 3 { + return Err(ErrorCode::Internal(format!( + "invalid path for temp table: {path}" + ))); + }; + let user_name = parts[1].to_string(); + let session_id = parts[2].to_string(); + if user_session_ids.contains(&(user_name.clone(), session_id.clone())) { + continue; + } + user_session_ids.insert((user_name.clone(), session_id.clone())); + if client_session_mgr + .get_client_session(&user_name, &session_id) + .await? + .is_none() + { + inactive_user_session_ids.push((user_name, session_id)); + if inactive_user_session_ids.len() >= session_limit { + break; + } + } + } + + let mut session_num = 0; + + for (user_name, session_id) in inactive_user_session_ids { + if client_session_mgr + .get_client_session(&user_name, &session_id) + .await? + .is_none() + { + let path = format!("{}/{}/{}", TEMP_TABLE_STORAGE_PREFIX, user_name, session_id); + info!( + "[TEMP TABLE] session={session_id} vacuum temporary table: {}", + path + ); + op.remove_all(&path).await?; + session_num += 1; + } + } + + Ok(session_num) +} + pub struct FuseVacuumTemporaryTable { limit: Option, } @@ -64,59 +134,7 @@ impl SimpleTableFunc for FuseVacuumTemporaryTable { ctx: &Arc, _plan: &DataSourcePlan, ) -> Result> { - let op = DataOperator::instance().operator(); - let mut lister = op - .lister_with(TEMP_TABLE_STORAGE_PREFIX) - .recursive(true) - .await?; - let client_session_mgr = UserApiProvider::instance().client_session_api(&ctx.get_tenant()); - let mut user_session_ids = HashSet::new(); - let mut inactive_user_session_ids = HashSet::new(); - while let Some(entry) = lister.try_next().await? { - if entry.metadata().is_dir() { - continue; - } - let path = entry.path(); - let parts: Vec<_> = path.split('/').collect(); - if parts.len() < 3 { - return Err(ErrorCode::Internal(format!( - "invalid path for temp table: {path}" - ))); - }; - let user_name = parts[1].to_string(); - let session_id = parts[2].to_string(); - if user_session_ids.contains(&(user_name.clone(), session_id.clone())) { - continue; - } - user_session_ids.insert((user_name.clone(), session_id.clone())); - if client_session_mgr - .get_client_session(&user_name, &session_id) - .await? - .is_none() - { - inactive_user_session_ids.insert((user_name, session_id)); - if inactive_user_session_ids.len() >= self.limit.unwrap_or(u64::MAX) as usize { - break; - } - } - } - - let session_num = inactive_user_session_ids.len(); - - for (user_name, session_id) in inactive_user_session_ids { - if client_session_mgr - .get_client_session(&user_name, &session_id) - .await? - .is_none() - { - let path = format!("{}/{}/{}", TEMP_TABLE_STORAGE_PREFIX, user_name, session_id); - info!( - "[TEMP TABLE] session={session_id} vacuum temporary table: {}", - path - ); - op.remove_all(&path).await?; - } - } + let session_num = vacuum_inactive_temp_tables(ctx, self.limit).await?; let col: Vec = vec![format!( "Ok: processed temporary tables from {} inactive sessions", session_num diff --git a/src/query/storages/fuse/src/table_functions/mod.rs b/src/query/storages/fuse/src/table_functions/mod.rs index 6aa5b38c9fa0d..a568dacb0f5de 100644 --- a/src/query/storages/fuse/src/table_functions/mod.rs +++ b/src/query/storages/fuse/src/table_functions/mod.rs @@ -49,6 +49,7 @@ pub use fuse_time_travel_size::FuseTimeTravelSize; pub use fuse_time_travel_size::FuseTimeTravelSizeFunc; pub use fuse_vacuum_drop_aggregating_index::FuseVacuumDropAggregatingIndex; pub use fuse_vacuum_drop_inverted_index::FuseVacuumDropInvertedIndex; +pub use fuse_vacuum_temporary_table::vacuum_inactive_temp_tables; pub use fuse_vacuum_temporary_table::FuseVacuumTemporaryTable; pub use fuse_virtual_column::FuseVirtualColumnFunc; pub use set_cache_capacity::SetCacheCapacity; From 6a0a30a91224c0768dd15862acde2e7bfeed8c40 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 11 Dec 2025 10:27:01 +0800 Subject: [PATCH 2/4] adjust resultset schema --- .../interpreter_vacuum_temporary_files.rs | 7 ++++--- src/query/sql/src/planner/plans/ddl/table.rs | 14 ++++++++++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs b/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs index d733d1119654c..33c942ed28273 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs @@ -73,11 +73,12 @@ impl Interpreter for VacuumTemporaryFilesInterpreter { .plan .limit .map(|limit| limit.saturating_sub(removed_files as u64)); - let cleaned_sessions = vacuum_inactive_temp_tables(&table_ctx, session_limit).await? as u64; - let total_cleaned = removed_files as u64 + cleaned_sessions; + let cleaned_temp_tables = + vacuum_inactive_temp_tables(&table_ctx, session_limit).await? as u64; PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ - UInt64Type::from_data(vec![total_cleaned]), + UInt64Type::from_data(vec![removed_files as u64]), + UInt64Type::from_data(vec![cleaned_temp_tables]), ])]) } } diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index 0edf049101f27..bbdd91e32fcc2 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -192,10 +192,16 @@ pub struct VacuumTemporaryFilesPlan { impl crate::plans::VacuumTemporaryFilesPlan { pub fn schema(&self) -> DataSchemaRef { - Arc::new(DataSchema::new(vec![DataField::new( - "Files", - DataType::Number(NumberDataType::UInt64), - )])) + Arc::new(DataSchema::new(vec![ + DataField::new( + "spill_files", + DataType::Number(NumberDataType::UInt64), + ), + DataField::new( + "temp_tables", + DataType::Number(NumberDataType::UInt64), + ), + ])) } } From 69c7f3e5b69bcc161bd508621c94ee8a1e7360d6 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 11 Dec 2025 10:28:30 +0800 Subject: [PATCH 3/4] cargo fmt --- src/query/sql/src/planner/plans/ddl/table.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index bbdd91e32fcc2..b2c90af769ddc 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -193,14 +193,8 @@ pub struct VacuumTemporaryFilesPlan { impl crate::plans::VacuumTemporaryFilesPlan { pub fn schema(&self) -> DataSchemaRef { Arc::new(DataSchema::new(vec![ - DataField::new( - "spill_files", - DataType::Number(NumberDataType::UInt64), - ), - DataField::new( - "temp_tables", - DataType::Number(NumberDataType::UInt64), - ), + DataField::new("spill_files", DataType::Number(NumberDataType::UInt64)), + DataField::new("temp_tables", DataType::Number(NumberDataType::UInt64)), ])) } } From bcefc7b40c4ad7f0a1b6421c2517a397216fe387 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 12 Dec 2025 18:27:18 +0800 Subject: [PATCH 4/4] tweak result schema --- .../src/interpreters/interpreter_vacuum_temporary_files.rs | 4 ++-- src/query/sql/src/planner/plans/ddl/table.rs | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs b/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs index 33c942ed28273..98ba5f4b55554 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs @@ -73,12 +73,12 @@ impl Interpreter for VacuumTemporaryFilesInterpreter { .plan .limit .map(|limit| limit.saturating_sub(removed_files as u64)); - let cleaned_temp_tables = + let cleaned_temp_table_sessions = vacuum_inactive_temp_tables(&table_ctx, session_limit).await? as u64; PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ UInt64Type::from_data(vec![removed_files as u64]), - UInt64Type::from_data(vec![cleaned_temp_tables]), + UInt64Type::from_data(vec![cleaned_temp_table_sessions]), ])]) } } diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index b2c90af769ddc..1086d67b98d28 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -194,7 +194,10 @@ impl crate::plans::VacuumTemporaryFilesPlan { pub fn schema(&self) -> DataSchemaRef { Arc::new(DataSchema::new(vec![ DataField::new("spill_files", DataType::Number(NumberDataType::UInt64)), - DataField::new("temp_tables", DataType::Number(NumberDataType::UInt64)), + DataField::new( + "temp_table_sessions", + DataType::Number(NumberDataType::UInt64), + ), ])) } }