Skip to content

Commit e2896ef

Browse files
committed
🤖 refactor: unify pending+active stream tracking
1 parent a99431b commit e2896ef

File tree

2 files changed

+57
-32
lines changed

2 files changed

+57
-32
lines changed

src/browser/stores/WorkspaceStore.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ export class WorkspaceStore {
493493
name: metadata?.name ?? workspaceId, // Fall back to ID if metadata missing
494494
messages: aggregator.getDisplayedMessages(),
495495
queuedMessage: this.queuedMessages.get(workspaceId) ?? null,
496-
canInterrupt: activeStreams.length > 0 || aggregator.hasPendingStreams(),
496+
canInterrupt: activeStreams.length > 0 || aggregator.hasInFlightStreams(),
497497
isCompacting: aggregator.isCompacting(),
498498
awaitingUserQuestion: aggregator.hasAwaitingUserQuestion(),
499499
loading: !hasMessages && !isCaughtUp,

src/browser/utils/messages/StreamingMessageAggregator.ts

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ interface StreamingContext {
5353
model: string;
5454
}
5555

56+
type InFlightStreamState =
57+
| {
58+
phase: "pending";
59+
pendingAt: number;
60+
model: string;
61+
}
62+
| {
63+
phase: "active";
64+
context: StreamingContext;
65+
};
66+
5667
/**
5768
* Check if a tool result indicates success (for tools that return { success: boolean })
5869
*/
@@ -136,11 +147,10 @@ function mergeAdjacentParts(parts: MuxMessage["parts"]): MuxMessage["parts"] {
136147
}
137148

138149
export class StreamingMessageAggregator {
139-
// Streams that have emitted `stream-pending` but not `stream-start` yet.
140-
// This is the "connecting" phase: abort should work, but no deltas have started.
141-
private pendingStreams = new Map<string, { startTime: number; model: string }>();
142150
private messages = new Map<string, MuxMessage>();
143-
private activeStreams = new Map<string, StreamingContext>();
151+
152+
// Streams that are in-flight (pending: `stream-pending` received; active: `stream-start` received).
153+
private inFlightStreams = new Map<string, InFlightStreamState>();
144154

145155
// Simple cache for derived values (invalidated on every mutation)
146156
private cachedAllMessages: MuxMessage[] | null = null;
@@ -340,15 +350,14 @@ export class StreamingMessageAggregator {
340350
* Called by handleStreamEnd, handleStreamAbort, and handleStreamError.
341351
*
342352
* Clears:
343-
* - Active stream tracking (this.activeStreams)
353+
* - In-flight stream tracking (this.inFlightStreams)
344354
* - Current TODOs (this.currentTodos) - reconstructed from history on reload
345355
*
346356
* Does NOT clear:
347357
* - agentStatus - persists after stream completion to show last activity
348358
*/
349359
private cleanupStreamState(messageId: string): void {
350-
this.activeStreams.delete(messageId);
351-
this.pendingStreams.delete(messageId);
360+
this.inFlightStreams.delete(messageId);
352361
// Clear todos when stream ends - they're stream-scoped state
353362
// On reload, todos will be reconstructed from completed tool_write calls in history
354363
this.currentTodos = [];
@@ -466,24 +475,31 @@ export class StreamingMessageAggregator {
466475
this.pendingStreamStartTime = time;
467476
}
468477

469-
hasPendingStreams(): boolean {
470-
return this.pendingStreams.size > 0;
478+
hasInFlightStreams(): boolean {
479+
return this.inFlightStreams.size > 0;
471480
}
472481
getActiveStreams(): StreamingContext[] {
473-
return Array.from(this.activeStreams.values());
482+
const active: StreamingContext[] = [];
483+
for (const stream of this.inFlightStreams.values()) {
484+
if (stream.phase === "active") active.push(stream.context);
485+
}
486+
return active;
474487
}
475488

476489
/**
477490
* Get the messageId of the first active stream (for token tracking)
478491
* Returns undefined if no streams are active
479492
*/
480493
getActiveStreamMessageId(): string | undefined {
481-
return this.activeStreams.keys().next().value;
494+
for (const [messageId, stream] of this.inFlightStreams.entries()) {
495+
if (stream.phase === "active") return messageId;
496+
}
497+
return undefined;
482498
}
483499

484500
isCompacting(): boolean {
485-
for (const context of this.activeStreams.values()) {
486-
if (context.isCompacting) {
501+
for (const stream of this.inFlightStreams.values()) {
502+
if (stream.phase === "active" && stream.context.isCompacting) {
487503
return true;
488504
}
489505
}
@@ -492,13 +508,13 @@ export class StreamingMessageAggregator {
492508

493509
getCurrentModel(): string | undefined {
494510
// If there's an active stream, return its model
495-
for (const context of this.activeStreams.values()) {
496-
return context.model;
511+
for (const stream of this.inFlightStreams.values()) {
512+
if (stream.phase === "active") return stream.context.model;
497513
}
498514

499-
// If we're connecting (stream-pending), return that model
500-
for (const context of this.pendingStreams.values()) {
501-
return context.model;
515+
// If we're pending (stream-pending), return that model
516+
for (const stream of this.inFlightStreams.values()) {
517+
if (stream.phase === "pending") return stream.model;
502518
}
503519

504520
// Otherwise, return the model from the most recent assistant message
@@ -514,13 +530,14 @@ export class StreamingMessageAggregator {
514530
}
515531

516532
clearActiveStreams(): void {
517-
this.activeStreams.clear();
533+
this.setPendingStreamStartTime(null);
534+
this.inFlightStreams.clear();
535+
this.invalidateCache();
518536
}
519537

520538
clear(): void {
521539
this.messages.clear();
522-
this.activeStreams.clear();
523-
this.pendingStreams.clear();
540+
this.inFlightStreams.clear();
524541
this.invalidateCache();
525542
}
526543

@@ -547,14 +564,21 @@ export class StreamingMessageAggregator {
547564
// Clear pending stream start timestamp - backend has accepted the request.
548565
this.setPendingStreamStartTime(null);
549566

550-
this.pendingStreams.set(data.messageId, { startTime: Date.now(), model: data.model });
567+
const existing = this.inFlightStreams.get(data.messageId);
568+
if (existing?.phase === "active") return;
569+
570+
this.inFlightStreams.set(data.messageId, {
571+
phase: "pending",
572+
pendingAt: Date.now(),
573+
model: data.model,
574+
});
575+
551576
this.invalidateCache();
552577
}
553578

554579
handleStreamStart(data: StreamStartEvent): void {
555-
// Clear pending/connecting state - stream has started.
580+
// Clear pending stream start timestamp - stream has started.
556581
this.setPendingStreamStartTime(null);
557-
this.pendingStreams.delete(data.messageId);
558582

559583
// NOTE: We do NOT clear agentStatus or currentTodos here.
560584
// They are cleared when a new user message arrives (see handleMessage),
@@ -574,7 +598,7 @@ export class StreamingMessageAggregator {
574598

575599
// Use messageId as key - ensures only ONE stream per message
576600
// If called twice (e.g., during replay), second call safely overwrites first
577-
this.activeStreams.set(data.messageId, context);
601+
this.inFlightStreams.set(data.messageId, { phase: "active", context });
578602

579603
// Create initial streaming message with empty parts (deltas will append)
580604
const streamingMessage = createMuxMessage(data.messageId, "assistant", "", {
@@ -606,7 +630,8 @@ export class StreamingMessageAggregator {
606630

607631
handleStreamEnd(data: StreamEndEvent): void {
608632
// Direct lookup by messageId - O(1) instead of O(n) find
609-
const activeStream = this.activeStreams.get(data.messageId);
633+
const stream = this.inFlightStreams.get(data.messageId);
634+
const activeStream = stream?.phase === "active" ? stream.context : undefined;
610635

611636
if (activeStream) {
612637
// Normal streaming case: we've been tracking this stream from the start
@@ -673,7 +698,8 @@ export class StreamingMessageAggregator {
673698

674699
handleStreamAbort(data: StreamAbortEvent): void {
675700
// Direct lookup by messageId
676-
const activeStream = this.activeStreams.get(data.messageId);
701+
const stream = this.inFlightStreams.get(data.messageId);
702+
const activeStream = stream?.phase === "active" ? stream.context : undefined;
677703

678704
if (activeStream) {
679705
// Mark the message as interrupted and merge metadata (consistent with handleStreamEnd)
@@ -696,8 +722,7 @@ export class StreamingMessageAggregator {
696722
}
697723

698724
handleStreamError(data: StreamErrorMessage): void {
699-
const isTrackedStream =
700-
this.activeStreams.has(data.messageId) || this.pendingStreams.has(data.messageId);
725+
const isTrackedStream = this.inFlightStreams.has(data.messageId);
701726

702727
if (isTrackedStream) {
703728
// Mark the message with error metadata
@@ -868,7 +893,7 @@ export class StreamingMessageAggregator {
868893

869894
handleReasoningEnd(_data: ReasoningEndEvent): void {
870895
// Reasoning-end is just a signal - no state to update
871-
// Streaming status is inferred from activeStreams in getDisplayedMessages
896+
// Streaming status is inferred from inFlightStreams in getDisplayedMessages
872897
this.invalidateCache();
873898
}
874899

@@ -1059,7 +1084,7 @@ export class StreamingMessageAggregator {
10591084

10601085
// Check if this message has an active stream (for inferring streaming status)
10611086
// Direct Map.has() check - O(1) instead of O(n) iteration
1062-
const hasActiveStream = this.activeStreams.has(message.id);
1087+
const hasActiveStream = this.inFlightStreams.get(message.id)?.phase === "active";
10631088

10641089
// Merge adjacent text/reasoning parts for display
10651090
const mergedParts = mergeAdjacentParts(message.parts);

0 commit comments

Comments
 (0)