Skip to content

Commit 02af370

Browse files
committed
🤖 fix: prevent RetryBarrier flashes during stream startup
1 parent d180c80 commit 02af370

File tree

15 files changed

+168
-39
lines changed

15 files changed

+168
-39
lines changed

src/browser/stores/WorkspaceStore.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,15 @@ export class WorkspaceStore {
161161
data: WorkspaceChatMessage
162162
) => void
163163
> = {
164+
"stream-pending": (workspaceId, aggregator, data) => {
165+
aggregator.handleStreamPending(data as never);
166+
if (this.onModelUsed) {
167+
this.onModelUsed((data as { model: string }).model);
168+
}
169+
this.states.bump(workspaceId);
170+
// Bump usage store so liveUsage can show the current model even before streaming starts
171+
this.usageStore.bump(workspaceId);
172+
},
164173
"stream-start": (workspaceId, aggregator, data) => {
165174
aggregator.handleStreamStart(data as never);
166175
if (this.onModelUsed) {
@@ -484,7 +493,7 @@ export class WorkspaceStore {
484493
name: metadata?.name ?? workspaceId, // Fall back to ID if metadata missing
485494
messages: aggregator.getDisplayedMessages(),
486495
queuedMessage: this.queuedMessages.get(workspaceId) ?? null,
487-
canInterrupt: activeStreams.length > 0,
496+
canInterrupt: activeStreams.length > 0 || aggregator.hasConnectingStreams(),
488497
isCompacting: aggregator.isCompacting(),
489498
awaitingUserQuestion: aggregator.hasAwaitingUserQuestion(),
490499
loading: !hasMessages && !isCaughtUp,
@@ -969,7 +978,8 @@ export class WorkspaceStore {
969978
// Check if there's an active stream in buffered events (reconnection scenario)
970979
const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? [];
971980
const hasActiveStream = pendingEvents.some(
972-
(event) => "type" in event && event.type === "stream-start"
981+
(event) =>
982+
"type" in event && (event.type === "stream-start" || event.type === "stream-pending")
973983
);
974984

975985
// Load historical messages first

src/browser/utils/messages/StreamingMessageAggregator.ts

Lines changed: 63 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
} from "@/common/types/message";
88
import { createMuxMessage } from "@/common/types/message";
99
import type {
10+
StreamPendingEvent,
1011
StreamStartEvent,
1112
StreamDeltaEvent,
1213
UsageDeltaEvent,
@@ -135,6 +136,9 @@ function mergeAdjacentParts(parts: MuxMessage["parts"]): MuxMessage["parts"] {
135136
}
136137

137138
export class StreamingMessageAggregator {
139+
// Streams that have been registered/started in the backend but haven't emitted stream-start yet.
140+
// This is the "connecting" phase: abort should work, but no deltas have started.
141+
private connectingStreams = new Map<string, { startTime: number; model: string }>();
138142
private messages = new Map<string, MuxMessage>();
139143
private activeStreams = new Map<string, StreamingContext>();
140144

@@ -344,6 +348,7 @@ export class StreamingMessageAggregator {
344348
*/
345349
private cleanupStreamState(messageId: string): void {
346350
this.activeStreams.delete(messageId);
351+
this.connectingStreams.delete(messageId);
347352
// Clear todos when stream ends - they're stream-scoped state
348353
// On reload, todos will be reconstructed from completed tool_write calls in history
349354
this.currentTodos = [];
@@ -461,6 +466,9 @@ export class StreamingMessageAggregator {
461466
this.pendingStreamStartTime = time;
462467
}
463468

469+
hasConnectingStreams(): boolean {
470+
return this.connectingStreams.size > 0;
471+
}
464472
getActiveStreams(): StreamingContext[] {
465473
return Array.from(this.activeStreams.values());
466474
}
@@ -488,6 +496,11 @@ export class StreamingMessageAggregator {
488496
return context.model;
489497
}
490498

499+
// If we're connecting (stream-pending), return that model
500+
for (const context of this.connectingStreams.values()) {
501+
return context.model;
502+
}
503+
491504
// Otherwise, return the model from the most recent assistant message
492505
const messages = this.getAllMessages();
493506
for (let i = messages.length - 1; i >= 0; i--) {
@@ -507,6 +520,7 @@ export class StreamingMessageAggregator {
507520
clear(): void {
508521
this.messages.clear();
509522
this.activeStreams.clear();
523+
this.connectingStreams.clear();
510524
this.invalidateCache();
511525
}
512526

@@ -529,9 +543,30 @@ export class StreamingMessageAggregator {
529543
}
530544

531545
// Unified event handlers that encapsulate all complex logic
546+
handleStreamPending(data: StreamPendingEvent): void {
547+
// Clear pending stream start timestamp - backend has accepted the request.
548+
this.setPendingStreamStartTime(null);
549+
550+
this.connectingStreams.set(data.messageId, { startTime: Date.now(), model: data.model });
551+
552+
// Create a placeholder assistant message (kept invisible until parts arrive)
553+
// so that out-of-order deltas (if they ever occur) have somewhere to attach.
554+
if (!this.messages.has(data.messageId)) {
555+
const connectingMessage = createMuxMessage(data.messageId, "assistant", "", {
556+
historySequence: data.historySequence,
557+
timestamp: Date.now(),
558+
model: data.model,
559+
});
560+
this.messages.set(data.messageId, connectingMessage);
561+
}
562+
563+
this.invalidateCache();
564+
}
565+
532566
handleStreamStart(data: StreamStartEvent): void {
533-
// Clear pending stream start timestamp - stream has started
567+
// Clear pending/connecting state - stream has started.
534568
this.setPendingStreamStartTime(null);
569+
this.connectingStreams.delete(data.messageId);
535570

536571
// NOTE: We do NOT clear agentStatus or currentTodos here.
537572
// They are cleared when a new user message arrives (see handleMessage),
@@ -673,10 +708,10 @@ export class StreamingMessageAggregator {
673708
}
674709

675710
handleStreamError(data: StreamErrorMessage): void {
676-
// Direct lookup by messageId
677-
const activeStream = this.activeStreams.get(data.messageId);
711+
const isTrackedStream =
712+
this.activeStreams.has(data.messageId) || this.connectingStreams.has(data.messageId);
678713

679-
if (activeStream) {
714+
if (isTrackedStream) {
680715
// Mark the message with error metadata
681716
const message = this.messages.get(data.messageId);
682717
if (message?.metadata) {
@@ -688,32 +723,33 @@ export class StreamingMessageAggregator {
688723
this.compactMessageParts(message);
689724
}
690725

691-
// Clean up stream-scoped state (active stream tracking, TODOs)
726+
// Clean up stream-scoped state (active/connecting tracking, TODOs)
692727
this.cleanupStreamState(data.messageId);
693728
this.invalidateCache();
694-
} else {
695-
// Pre-stream error (e.g., API key not configured before streaming starts)
696-
// Create a synthetic error message since there's no active stream to attach to
697-
// Get the highest historySequence from existing messages so this appears at the end
698-
const maxSequence = Math.max(
699-
0,
700-
...Array.from(this.messages.values()).map((m) => m.metadata?.historySequence ?? 0)
701-
);
702-
const errorMessage: MuxMessage = {
703-
id: data.messageId,
704-
role: "assistant",
705-
parts: [],
706-
metadata: {
707-
partial: true,
708-
error: data.error,
709-
errorType: data.errorType,
710-
timestamp: Date.now(),
711-
historySequence: maxSequence + 1,
712-
},
713-
};
714-
this.messages.set(data.messageId, errorMessage);
715-
this.invalidateCache();
729+
return;
716730
}
731+
732+
// Pre-stream error (e.g., API key not configured before streaming starts)
733+
// Create a synthetic error message since there's no tracked stream to attach to.
734+
// Get the highest historySequence from existing messages so this appears at the end.
735+
const maxSequence = Math.max(
736+
0,
737+
...Array.from(this.messages.values()).map((m) => m.metadata?.historySequence ?? 0)
738+
);
739+
const errorMessage: MuxMessage = {
740+
id: data.messageId,
741+
role: "assistant",
742+
parts: [],
743+
metadata: {
744+
partial: true,
745+
error: data.error,
746+
errorType: data.errorType,
747+
timestamp: Date.now(),
748+
historySequence: maxSequence + 1,
749+
},
750+
};
751+
this.messages.set(data.messageId, errorMessage);
752+
this.invalidateCache();
717753
}
718754

719755
handleToolCallStart(data: ToolCallStartEvent): void {

src/browser/utils/messages/retryEligibility.test.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,35 @@ describe("hasInterruptedStream", () => {
239239
expect(hasInterruptedStream(messages, null)).toBe(true);
240240
});
241241

242+
it("returns false when pendingStreamStartTime is null but last user message timestamp is recent (replay/reload)", () => {
243+
const justSentTimestamp = Date.now() - (PENDING_STREAM_START_GRACE_PERIOD_MS - 500);
244+
const messages: DisplayedMessage[] = [
245+
{
246+
type: "user",
247+
id: "user-1",
248+
historyId: "user-1",
249+
content: "Hello",
250+
historySequence: 1,
251+
timestamp: justSentTimestamp,
252+
},
253+
];
254+
expect(hasInterruptedStream(messages, null)).toBe(false);
255+
});
256+
257+
it("returns true when pendingStreamStartTime is null and last user message timestamp is old (replay/reload)", () => {
258+
const longAgoTimestamp = Date.now() - (PENDING_STREAM_START_GRACE_PERIOD_MS + 1000);
259+
const messages: DisplayedMessage[] = [
260+
{
261+
type: "user",
262+
id: "user-1",
263+
historyId: "user-1",
264+
content: "Hello",
265+
historySequence: 1,
266+
timestamp: longAgoTimestamp,
267+
},
268+
];
269+
expect(hasInterruptedStream(messages, null)).toBe(true);
270+
});
242271
it("returns false when user message just sent (within grace period)", () => {
243272
const messages: DisplayedMessage[] = [
244273
{

src/browser/utils/messages/retryEligibility.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,16 +80,22 @@ export function hasInterruptedStream(
8080
): boolean {
8181
if (messages.length === 0) return false;
8282

83-
// Don't show retry barrier if user message was sent very recently (within the grace period)
84-
// This prevents flash during normal send flow while stream-start event arrives
85-
// After the grace period, assume something is wrong and show the barrier
86-
if (pendingStreamStartTime !== null) {
87-
const elapsed = Date.now() - pendingStreamStartTime;
83+
const lastMessage = messages[messages.length - 1];
84+
85+
// Don't show retry barrier if the last user message was sent very recently (within the grace period).
86+
//
87+
// We prefer the explicit pendingStreamStartTime (set during the live send flow).
88+
// But during history replay / app reload, pendingStreamStartTime can be null even when the last
89+
// message is a fresh user message. In that case, fall back to the user message timestamp.
90+
const graceStartTime =
91+
pendingStreamStartTime ??
92+
(lastMessage.type === "user" ? (lastMessage.timestamp ?? null) : null);
93+
94+
if (graceStartTime !== null) {
95+
const elapsed = Date.now() - graceStartTime;
8896
if (elapsed < PENDING_STREAM_START_GRACE_PERIOD_MS) return false;
8997
}
9098

91-
const lastMessage = messages[messages.length - 1];
92-
9399
// ask_user_question is a special case: an unfinished tool call represents an
94100
// intentional "waiting for user input" state, not a stream interruption.
95101
//

src/common/orpc/schemas.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ export {
8888
StreamDeltaEventSchema,
8989
StreamEndEventSchema,
9090
StreamErrorMessageSchema,
91+
StreamPendingEventSchema,
9192
StreamStartEventSchema,
9293
ToolCallDeltaEventSchema,
9394
ToolCallEndEventSchema,

src/common/orpc/schemas/stream.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,17 @@ export const DeleteMessageSchema = z.object({
2727
historySequences: z.array(z.number()),
2828
});
2929

30+
// Emitted when a stream has been registered and is abortable, but before streaming begins.
31+
// This prevents RetryBarrier flash during slow provider connection/setup.
32+
export const StreamPendingEventSchema = z.object({
33+
type: z.literal("stream-pending"),
34+
workspaceId: z.string(),
35+
messageId: z.string(),
36+
model: z.string(),
37+
historySequence: z.number().meta({
38+
description: "Backend assigns global message ordering",
39+
}),
40+
});
3041
export const StreamStartEventSchema = z.object({
3142
type: z.literal("stream-start"),
3243
workspaceId: z.string(),
@@ -273,6 +284,7 @@ export const WorkspaceChatMessageSchema = z.discriminatedUnion("type", [
273284
CaughtUpMessageSchema,
274285
StreamErrorMessageSchema,
275286
DeleteMessageSchema,
287+
StreamPendingEventSchema,
276288
StreamStartEventSchema,
277289
StreamDeltaEventSchema,
278290
StreamEndEventSchema,

src/common/orpc/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { z } from "zod";
22
import type * as schemas from "./schemas";
33

44
import type {
5+
StreamPendingEvent,
56
StreamStartEvent,
67
StreamDeltaEvent,
78
StreamEndEvent,
@@ -43,6 +44,10 @@ export function isStreamError(msg: WorkspaceChatMessage): msg is StreamErrorMess
4344
return (msg as { type?: string }).type === "stream-error";
4445
}
4546

47+
export function isStreamPending(msg: WorkspaceChatMessage): msg is StreamPendingEvent {
48+
return (msg as { type?: string }).type === "stream-pending";
49+
}
50+
4651
export function isDeleteMessage(msg: WorkspaceChatMessage): msg is DeleteMessage {
4752
return (msg as { type?: string }).type === "delete";
4853
}

src/common/types/stream.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import type {
1111
StreamAbortEventSchema,
1212
StreamDeltaEventSchema,
1313
StreamEndEventSchema,
14+
StreamPendingEventSchema,
1415
StreamStartEventSchema,
1516
ToolCallDeltaEventSchema,
1617
ToolCallEndEventSchema,
@@ -22,6 +23,7 @@ import type {
2223
* Completed message part (reasoning, text, or tool) suitable for serialization
2324
* Used in StreamEndEvent and partial message storage
2425
*/
26+
export type StreamPendingEvent = z.infer<typeof StreamPendingEventSchema>;
2527
export type CompletedMessagePart = MuxReasoningPart | MuxTextPart | MuxToolPart;
2628

2729
export type StreamStartEvent = z.infer<typeof StreamStartEventSchema>;
@@ -45,6 +47,7 @@ export type ReasoningEndEvent = z.infer<typeof ReasoningEndEventSchema>;
4547
export type UsageDeltaEvent = z.infer<typeof UsageDeltaEventSchema>;
4648

4749
export type AIServiceEvent =
50+
| StreamPendingEvent
4851
| StreamStartEvent
4952
| StreamDeltaEvent
5053
| StreamEndEvent

src/node/services/agentSession.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ export class AgentSession {
619619
this.aiService.on(event, wrapped as never);
620620
};
621621

622+
forward("stream-pending", (payload) => this.emitChatEvent(payload));
622623
forward("stream-start", (payload) => this.emitChatEvent(payload));
623624
forward("stream-delta", (payload) => this.emitChatEvent(payload));
624625
forward("tool-call-start", (payload) => this.emitChatEvent(payload));

src/node/services/aiService.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ export class AIService extends EventEmitter {
348348
* Forward all stream events from StreamManager to AIService consumers
349349
*/
350350
private setupStreamEventForwarding(): void {
351+
this.streamManager.on("stream-pending", (data) => this.emit("stream-pending", data));
351352
this.streamManager.on("stream-start", (data) => this.emit("stream-start", data));
352353
this.streamManager.on("stream-delta", (data) => this.emit("stream-delta", data));
353354
this.streamManager.on("stream-end", (data) => this.emit("stream-end", data));

0 commit comments

Comments
 (0)