From 77b3a567404778be6dd8aca78382a1ff832dc8f0 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 13 Feb 2026 12:27:13 +0800 Subject: [PATCH 1/3] change _send_rpc use ExchangeSinkBuffer's weakptr to instead TaskExecutionContext --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 63 +++++++++---------- be/src/pipeline/exec/exchange_sink_buffer.h | 6 +- 2 files changed, 33 insertions(+), 36 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index ad5e8ce6e2a828..7a885e1ce3df7a 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -314,24 +314,24 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } - send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( - RpcInstance* ins, const std::string& err) { - auto task_lock = weak_task_ctx.lock(); - if (task_lock == nullptr) { - // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. - return; - } - // attach task for memory tracker and query id when core - SCOPED_ATTACH_TASK(_state); - _failed(ins->id, err); - }); + send_callback->addFailedHandler( + [&, weak_sink_buffer = weak_from_this()](RpcInstance* ins, const std::string& err) { + auto sink_buffer_lock = weak_sink_buffer.lock(); + if (sink_buffer_lock == nullptr) { + // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. + return; + } + // attach task for memory tracker and query id when core + SCOPED_ATTACH_TASK(_state); + _failed(ins->id, err); + }); send_callback->start_rpc_time = GetCurrentTimeNanos(); - send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( + send_callback->addSuccessHandler([&, weak_sink_buffer = weak_from_this()]( RpcInstance* ins_ptr, const bool& eos, const PTransmitDataResult& result, const int64_t& start_rpc_time) { - auto task_lock = weak_task_ctx.lock(); - if (task_lock == nullptr) { + auto sink_buffer_lock = weak_sink_buffer.lock(); + if (sink_buffer_lock == nullptr) { // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. return; } @@ -443,24 +443,24 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } - send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( - RpcInstance* ins, const std::string& err) { - auto task_lock = weak_task_ctx.lock(); - if (task_lock == nullptr) { - // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. - return; - } - // attach task for memory tracker and query id when core - SCOPED_ATTACH_TASK(_state); - _failed(ins->id, err); - }); + send_callback->addFailedHandler( + [&, weak_sink_buffer = weak_from_this()](RpcInstance* ins, const std::string& err) { + auto sink_buffer_lock = weak_sink_buffer.lock(); + if (sink_buffer_lock == nullptr) { + // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. + return; + } + // attach task for memory tracker and query id when core + SCOPED_ATTACH_TASK(_state); + _failed(ins->id, err); + }); send_callback->start_rpc_time = GetCurrentTimeNanos(); - send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( + send_callback->addSuccessHandler([&, weak_sink_buffer = weak_from_this()]( RpcInstance* ins_ptr, const bool& eos, const PTransmitDataResult& result, const int64_t& start_rpc_time) { - auto task_lock = weak_task_ctx.lock(); - if (task_lock == nullptr) { + auto sink_buffer_lock = weak_sink_buffer.lock(); + if (sink_buffer_lock == nullptr) { // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. return; } @@ -584,11 +584,8 @@ void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins, return; } ins.rpc_channel_is_turn_off = true; - auto weak_task_ctx = weak_task_exec_ctx(); - if (auto pip_ctx = weak_task_ctx.lock()) { - for (auto& parent : _parents) { - parent->on_channel_finished(ins.id); - } + for (auto& parent : _parents) { + parent->on_channel_finished(ins.id); } } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index eea02e22ac74c7..9cce8c7e4332d8 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -263,16 +263,16 @@ void transmit_blockv2(PBackendService_Stub* stub, ExchangeSendCallback>> closure); #endif -class ExchangeSinkBuffer : public HasTaskExecutionCtx { +class ExchangeSinkBuffer : std::enable_shared_from_this { public: ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, PlanNodeId node_id, RuntimeState* state, const std::vector& sender_ins_ids); #ifdef BE_TEST ExchangeSinkBuffer(RuntimeState* state, int64_t sinknum) - : HasTaskExecutionCtx(state), _state(state), _exchange_sink_num(sinknum) {}; + : _state(state), _exchange_sink_num(sinknum) {}; #endif - ~ExchangeSinkBuffer() override = default; + ~ExchangeSinkBuffer() = default; void construct_request(TUniqueId); From ab39e2c0ca406d480c4fdff0d772a451c19ac2c4 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 13 Feb 2026 12:39:33 +0800 Subject: [PATCH 2/3] update --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 +-- be/src/pipeline/exec/exchange_sink_buffer.h | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 7a885e1ce3df7a..f9c6b689e02df1 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -92,8 +92,7 @@ namespace pipeline { ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, PlanNodeId node_id, RuntimeState* state, const std::vector& sender_ins_ids) - : HasTaskExecutionCtx(state), - _queue_capacity(0), + : _queue_capacity(0), _is_failed(false), _query_id(std::move(query_id)), _dest_node_id(dest_node_id), diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 9cce8c7e4332d8..019bed48c960cd 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -263,7 +263,7 @@ void transmit_blockv2(PBackendService_Stub* stub, ExchangeSendCallback>> closure); #endif -class ExchangeSinkBuffer : std::enable_shared_from_this { +class ExchangeSinkBuffer : public std::enable_shared_from_this { public: ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, PlanNodeId node_id, RuntimeState* state, const std::vector& sender_ins_ids); From 7477cc39325e049031e6a028b86d9f510da3a7b2 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 13 Feb 2026 13:12:36 +0800 Subject: [PATCH 3/3] fix --- be/src/pipeline/exec/exchange_sink_buffer.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 019bed48c960cd..ad76deab52cdbe 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -272,8 +272,6 @@ class ExchangeSinkBuffer : public std::enable_shared_from_this