Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 31 additions & 35 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ namespace pipeline {
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id,
PlanNodeId node_id, RuntimeState* state,
const std::vector<InstanceLoId>& sender_ins_ids)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
: _queue_capacity(0),
_is_failed(false),
_query_id(std::move(query_id)),
_dest_node_id(dest_node_id),
Expand Down Expand Up @@ -314,24 +313,24 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
if (config::execution_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
}
send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()](
RpcInstance* ins, const std::string& err) {
auto task_lock = weak_task_ctx.lock();
if (task_lock == nullptr) {
// This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
return;
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
_failed(ins->id, err);
});
send_callback->addFailedHandler(
[&, weak_sink_buffer = weak_from_this()](RpcInstance* ins, const std::string& err) {
auto sink_buffer_lock = weak_sink_buffer.lock();
if (sink_buffer_lock == nullptr) {
// This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
return;
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
_failed(ins->id, err);
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
send_callback->addSuccessHandler([&, weak_sink_buffer = weak_from_this()](
RpcInstance* ins_ptr, const bool& eos,
const PTransmitDataResult& result,
const int64_t& start_rpc_time) {
auto task_lock = weak_task_ctx.lock();
if (task_lock == nullptr) {
auto sink_buffer_lock = weak_sink_buffer.lock();
if (sink_buffer_lock == nullptr) {
// This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
return;
}
Expand Down Expand Up @@ -443,24 +442,24 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
if (config::execution_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
}
send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()](
RpcInstance* ins, const std::string& err) {
auto task_lock = weak_task_ctx.lock();
if (task_lock == nullptr) {
// This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
return;
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
_failed(ins->id, err);
});
send_callback->addFailedHandler(
[&, weak_sink_buffer = weak_from_this()](RpcInstance* ins, const std::string& err) {
auto sink_buffer_lock = weak_sink_buffer.lock();
if (sink_buffer_lock == nullptr) {
// This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
return;
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
_failed(ins->id, err);
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
send_callback->addSuccessHandler([&, weak_sink_buffer = weak_from_this()](
RpcInstance* ins_ptr, const bool& eos,
const PTransmitDataResult& result,
const int64_t& start_rpc_time) {
auto task_lock = weak_task_ctx.lock();
if (task_lock == nullptr) {
auto sink_buffer_lock = weak_sink_buffer.lock();
if (sink_buffer_lock == nullptr) {
// This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more.
return;
}
Expand Down Expand Up @@ -584,11 +583,8 @@ void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins,
return;
}
ins.rpc_channel_is_turn_off = true;
auto weak_task_ctx = weak_task_exec_ctx();
if (auto pip_ctx = weak_task_ctx.lock()) {
for (auto& parent : _parents) {
parent->on_channel_finished(ins.id);
}
for (auto& parent : _parents) {
parent->on_channel_finished(ins.id);
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_parents stores raw ExchangeSinkLocalState*, and this method can be reached from async RPC callbacks. Removing the previous liveness guard and unconditionally calling parent->on_channel_finished(...) can lead to use-after-free if the local state has already been destroyed (even if the sink buffer is still alive). Please reintroduce a safe lifetime check (e.g., lock a TaskExecutionContext/other owning context before iterating) or change _parents to a lifetime-safe reference (or clear/unregister parents under lock during close) before calling into them.

Suggested change
parent->on_channel_finished(ins.id);
if (parent != nullptr) {
parent->on_channel_finished(ins.id);
}

Copilot uses AI. Check for mistakes.
}
}

Expand Down
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,15 @@ void transmit_blockv2(PBackendService_Stub* stub,
ExchangeSendCallback<PTransmitDataResult>>>
closure);
#endif
class ExchangeSinkBuffer : public HasTaskExecutionCtx {
class ExchangeSinkBuffer : public std::enable_shared_from_this<ExchangeSinkBuffer> {
public:
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, PlanNodeId node_id,
RuntimeState* state, const std::vector<InstanceLoId>& sender_ins_ids);
#ifdef BE_TEST
ExchangeSinkBuffer(RuntimeState* state, int64_t sinknum)
: HasTaskExecutionCtx(state), _state(state), _exchange_sink_num(sinknum) {};
: _state(state), _exchange_sink_num(sinknum) {};
#endif

~ExchangeSinkBuffer() override = default;

void construct_request(TUniqueId);

Status add_block(vectorized::Channel* channel, TransmitInfo&& request);
Expand Down
Loading