diff --git a/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs b/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs index 78c4757b4c6e4..98ba5f4b55554 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_temporary_files.rs @@ -21,6 +21,7 @@ use databend_common_expression::FromData; use databend_common_license::license::Feature::Vacuum; use databend_common_license::license_manager::LicenseManagerSwitch; use databend_common_sql::plans::VacuumTemporaryFilesPlan; +use databend_common_storages_fuse::table_functions::vacuum_inactive_temp_tables; use databend_enterprise_vacuum_handler::get_vacuum_handler; use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions; @@ -67,8 +68,17 @@ impl Interpreter for VacuumTemporaryFilesInterpreter { ) .await?; + let table_ctx: Arc = self.ctx.clone(); + let session_limit = self + .plan + .limit + .map(|limit| limit.saturating_sub(removed_files as u64)); + let cleaned_temp_table_sessions = + vacuum_inactive_temp_tables(&table_ctx, session_limit).await? as u64; + PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ UInt64Type::from_data(vec![removed_files as u64]), + UInt64Type::from_data(vec![cleaned_temp_table_sessions]), ])]) } } diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index 0edf049101f27..1086d67b98d28 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -192,10 +192,13 @@ pub struct VacuumTemporaryFilesPlan { impl crate::plans::VacuumTemporaryFilesPlan { pub fn schema(&self) -> DataSchemaRef { - Arc::new(DataSchema::new(vec![DataField::new( - "Files", - DataType::Number(NumberDataType::UInt64), - )])) + Arc::new(DataSchema::new(vec![ + DataField::new("spill_files", DataType::Number(NumberDataType::UInt64)), + DataField::new( + "temp_table_sessions", + DataType::Number(NumberDataType::UInt64), + ), + ])) } } diff --git a/src/query/storages/fuse/src/table_functions/fuse_vacuum_temporary_table.rs b/src/query/storages/fuse/src/table_functions/fuse_vacuum_temporary_table.rs index 4f9027d78e060..1f5a868fa6405 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_vacuum_temporary_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_vacuum_temporary_table.rs @@ -37,6 +37,76 @@ use log::info; use crate::sessions::TableContext; use crate::table_functions::SimpleTableFunc; +#[async_backtrace::framed] +pub async fn vacuum_inactive_temp_tables( + ctx: &Arc, + limit: Option, +) -> Result { + let op = DataOperator::instance().operator(); + let mut lister = op + .lister_with(TEMP_TABLE_STORAGE_PREFIX) + .recursive(true) + .await?; + + let client_session_mgr = UserApiProvider::instance().client_session_api(&ctx.get_tenant()); + let mut user_session_ids = HashSet::new(); + let mut inactive_user_session_ids = Vec::new(); + let session_limit = limit.unwrap_or(u64::MAX) as usize; + + if session_limit == 0 { + return Ok(0); + } + + while let Some(entry) = lister.try_next().await? { + if entry.metadata().is_dir() { + continue; + } + let path = entry.path(); + let parts: Vec<_> = path.split('/').collect(); + if parts.len() < 3 { + return Err(ErrorCode::Internal(format!( + "invalid path for temp table: {path}" + ))); + }; + let user_name = parts[1].to_string(); + let session_id = parts[2].to_string(); + if user_session_ids.contains(&(user_name.clone(), session_id.clone())) { + continue; + } + user_session_ids.insert((user_name.clone(), session_id.clone())); + if client_session_mgr + .get_client_session(&user_name, &session_id) + .await? + .is_none() + { + inactive_user_session_ids.push((user_name, session_id)); + if inactive_user_session_ids.len() >= session_limit { + break; + } + } + } + + let mut session_num = 0; + + for (user_name, session_id) in inactive_user_session_ids { + if client_session_mgr + .get_client_session(&user_name, &session_id) + .await? + .is_none() + { + let path = format!("{}/{}/{}", TEMP_TABLE_STORAGE_PREFIX, user_name, session_id); + info!( + "[TEMP TABLE] session={session_id} vacuum temporary table: {}", + path + ); + op.remove_all(&path).await?; + session_num += 1; + } + } + + Ok(session_num) +} + pub struct FuseVacuumTemporaryTable { limit: Option, } @@ -64,59 +134,7 @@ impl SimpleTableFunc for FuseVacuumTemporaryTable { ctx: &Arc, _plan: &DataSourcePlan, ) -> Result> { - let op = DataOperator::instance().operator(); - let mut lister = op - .lister_with(TEMP_TABLE_STORAGE_PREFIX) - .recursive(true) - .await?; - let client_session_mgr = UserApiProvider::instance().client_session_api(&ctx.get_tenant()); - let mut user_session_ids = HashSet::new(); - let mut inactive_user_session_ids = HashSet::new(); - while let Some(entry) = lister.try_next().await? { - if entry.metadata().is_dir() { - continue; - } - let path = entry.path(); - let parts: Vec<_> = path.split('/').collect(); - if parts.len() < 3 { - return Err(ErrorCode::Internal(format!( - "invalid path for temp table: {path}" - ))); - }; - let user_name = parts[1].to_string(); - let session_id = parts[2].to_string(); - if user_session_ids.contains(&(user_name.clone(), session_id.clone())) { - continue; - } - user_session_ids.insert((user_name.clone(), session_id.clone())); - if client_session_mgr - .get_client_session(&user_name, &session_id) - .await? - .is_none() - { - inactive_user_session_ids.insert((user_name, session_id)); - if inactive_user_session_ids.len() >= self.limit.unwrap_or(u64::MAX) as usize { - break; - } - } - } - - let session_num = inactive_user_session_ids.len(); - - for (user_name, session_id) in inactive_user_session_ids { - if client_session_mgr - .get_client_session(&user_name, &session_id) - .await? - .is_none() - { - let path = format!("{}/{}/{}", TEMP_TABLE_STORAGE_PREFIX, user_name, session_id); - info!( - "[TEMP TABLE] session={session_id} vacuum temporary table: {}", - path - ); - op.remove_all(&path).await?; - } - } + let session_num = vacuum_inactive_temp_tables(ctx, self.limit).await?; let col: Vec = vec![format!( "Ok: processed temporary tables from {} inactive sessions", session_num diff --git a/src/query/storages/fuse/src/table_functions/mod.rs b/src/query/storages/fuse/src/table_functions/mod.rs index 6aa5b38c9fa0d..a568dacb0f5de 100644 --- a/src/query/storages/fuse/src/table_functions/mod.rs +++ b/src/query/storages/fuse/src/table_functions/mod.rs @@ -49,6 +49,7 @@ pub use fuse_time_travel_size::FuseTimeTravelSize; pub use fuse_time_travel_size::FuseTimeTravelSizeFunc; pub use fuse_vacuum_drop_aggregating_index::FuseVacuumDropAggregatingIndex; pub use fuse_vacuum_drop_inverted_index::FuseVacuumDropInvertedIndex; +pub use fuse_vacuum_temporary_table::vacuum_inactive_temp_tables; pub use fuse_vacuum_temporary_table::FuseVacuumTemporaryTable; pub use fuse_virtual_column::FuseVirtualColumnFunc; pub use set_cache_capacity::SetCacheCapacity;