Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_expression::FromData;
use databend_common_license::license::Feature::Vacuum;
use databend_common_license::license_manager::LicenseManagerSwitch;
use databend_common_sql::plans::VacuumTemporaryFilesPlan;
use databend_common_storages_fuse::table_functions::vacuum_inactive_temp_tables;
use databend_enterprise_vacuum_handler::get_vacuum_handler;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions;

Expand Down Expand Up @@ -67,8 +68,17 @@ impl Interpreter for VacuumTemporaryFilesInterpreter {
)
.await?;

let table_ctx: Arc<dyn TableContext> = self.ctx.clone();
let session_limit = self
.plan
.limit
.map(|limit| limit.saturating_sub(removed_files as u64));
let cleaned_temp_table_sessions =
vacuum_inactive_temp_tables(&table_ctx, session_limit).await? as u64;

PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![
UInt64Type::from_data(vec![removed_files as u64]),
UInt64Type::from_data(vec![cleaned_temp_table_sessions]),
])])
}
}
11 changes: 7 additions & 4 deletions src/query/sql/src/planner/plans/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,13 @@ pub struct VacuumTemporaryFilesPlan {

impl crate::plans::VacuumTemporaryFilesPlan {
pub fn schema(&self) -> DataSchemaRef {
Arc::new(DataSchema::new(vec![DataField::new(
"Files",
DataType::Number(NumberDataType::UInt64),
)]))
Arc::new(DataSchema::new(vec![
DataField::new("spill_files", DataType::Number(NumberDataType::UInt64)),
DataField::new(
"temp_table_sessions",
DataType::Number(NumberDataType::UInt64),
),
]))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,76 @@ use log::info;
use crate::sessions::TableContext;
use crate::table_functions::SimpleTableFunc;

#[async_backtrace::framed]
pub async fn vacuum_inactive_temp_tables(
ctx: &Arc<dyn TableContext>,
limit: Option<u64>,
) -> Result<usize> {
let op = DataOperator::instance().operator();
let mut lister = op
.lister_with(TEMP_TABLE_STORAGE_PREFIX)
.recursive(true)
.await?;

let client_session_mgr = UserApiProvider::instance().client_session_api(&ctx.get_tenant());
let mut user_session_ids = HashSet::new();
let mut inactive_user_session_ids = Vec::new();
let session_limit = limit.unwrap_or(u64::MAX) as usize;

if session_limit == 0 {
return Ok(0);
}

while let Some(entry) = lister.try_next().await? {
if entry.metadata().is_dir() {
continue;
}
let path = entry.path();
let parts: Vec<_> = path.split('/').collect();
if parts.len() < 3 {
return Err(ErrorCode::Internal(format!(
"invalid path for temp table: {path}"
)));
};
let user_name = parts[1].to_string();
let session_id = parts[2].to_string();
if user_session_ids.contains(&(user_name.clone(), session_id.clone())) {
continue;
}
user_session_ids.insert((user_name.clone(), session_id.clone()));
if client_session_mgr
.get_client_session(&user_name, &session_id)
.await?
.is_none()
{
inactive_user_session_ids.push((user_name, session_id));
if inactive_user_session_ids.len() >= session_limit {
break;
}
}
}

let mut session_num = 0;

for (user_name, session_id) in inactive_user_session_ids {
if client_session_mgr
.get_client_session(&user_name, &session_id)
.await?
.is_none()
{
let path = format!("{}/{}/{}", TEMP_TABLE_STORAGE_PREFIX, user_name, session_id);
info!(
"[TEMP TABLE] session={session_id} vacuum temporary table: {}",
path
);
op.remove_all(&path).await?;
session_num += 1;
}
}

Ok(session_num)
}

pub struct FuseVacuumTemporaryTable {
limit: Option<u64>,
}
Expand Down Expand Up @@ -64,59 +134,7 @@ impl SimpleTableFunc for FuseVacuumTemporaryTable {
ctx: &Arc<dyn TableContext>,
_plan: &DataSourcePlan,
) -> Result<Option<DataBlock>> {
let op = DataOperator::instance().operator();
let mut lister = op
.lister_with(TEMP_TABLE_STORAGE_PREFIX)
.recursive(true)
.await?;
let client_session_mgr = UserApiProvider::instance().client_session_api(&ctx.get_tenant());
let mut user_session_ids = HashSet::new();
let mut inactive_user_session_ids = HashSet::new();
while let Some(entry) = lister.try_next().await? {
if entry.metadata().is_dir() {
continue;
}
let path = entry.path();
let parts: Vec<_> = path.split('/').collect();
if parts.len() < 3 {
return Err(ErrorCode::Internal(format!(
"invalid path for temp table: {path}"
)));
};
let user_name = parts[1].to_string();
let session_id = parts[2].to_string();
if user_session_ids.contains(&(user_name.clone(), session_id.clone())) {
continue;
}
user_session_ids.insert((user_name.clone(), session_id.clone()));
if client_session_mgr
.get_client_session(&user_name, &session_id)
.await?
.is_none()
{
inactive_user_session_ids.insert((user_name, session_id));
if inactive_user_session_ids.len() >= self.limit.unwrap_or(u64::MAX) as usize {
break;
}
}
}

let session_num = inactive_user_session_ids.len();

for (user_name, session_id) in inactive_user_session_ids {
if client_session_mgr
.get_client_session(&user_name, &session_id)
.await?
.is_none()
{
let path = format!("{}/{}/{}", TEMP_TABLE_STORAGE_PREFIX, user_name, session_id);
info!(
"[TEMP TABLE] session={session_id} vacuum temporary table: {}",
path
);
op.remove_all(&path).await?;
}
}
let session_num = vacuum_inactive_temp_tables(ctx, self.limit).await?;
let col: Vec<String> = vec![format!(
"Ok: processed temporary tables from {} inactive sessions",
session_num
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/table_functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub use fuse_time_travel_size::FuseTimeTravelSize;
pub use fuse_time_travel_size::FuseTimeTravelSizeFunc;
pub use fuse_vacuum_drop_aggregating_index::FuseVacuumDropAggregatingIndex;
pub use fuse_vacuum_drop_inverted_index::FuseVacuumDropInvertedIndex;
pub use fuse_vacuum_temporary_table::vacuum_inactive_temp_tables;
pub use fuse_vacuum_temporary_table::FuseVacuumTemporaryTable;
pub use fuse_virtual_column::FuseVirtualColumnFunc;
pub use set_cache_capacity::SetCacheCapacity;
Loading