Skip to content

Commit e1113a9

Browse files
committed
🤖 fix: cleanup pending stream state on abort/error
1 parent f466e82 commit e1113a9

File tree

2 files changed

+83
-24
lines changed

2 files changed

+83
-24
lines changed

src/browser/utils/messages/StreamingMessageAggregator.test.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,58 @@ describe("StreamingMessageAggregator", () => {
248248
expect(aggregator.getCurrentTodos()).toHaveLength(0);
249249
});
250250

251+
test("should clear in-flight streams when pending stream aborts", () => {
252+
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
253+
254+
aggregator.handleStreamPending({
255+
type: "stream-pending",
256+
workspaceId: "test-workspace",
257+
messageId: "msg1",
258+
historySequence: 1,
259+
model: "claude-3-5-sonnet-20241022",
260+
});
261+
262+
expect(aggregator.hasInFlightStreams()).toBe(true);
263+
264+
aggregator.handleStreamAbort({
265+
type: "stream-abort",
266+
workspaceId: "test-workspace",
267+
messageId: "msg1",
268+
metadata: {},
269+
});
270+
271+
expect(aggregator.hasInFlightStreams()).toBe(false);
272+
});
273+
274+
test("should surface stream-error when tracked stream errors before stream-start", () => {
275+
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
276+
277+
aggregator.handleStreamPending({
278+
type: "stream-pending",
279+
workspaceId: "test-workspace",
280+
messageId: "msg1",
281+
historySequence: 1,
282+
model: "claude-3-5-sonnet-20241022",
283+
});
284+
285+
aggregator.handleStreamError({
286+
type: "stream-error",
287+
messageId: "msg1",
288+
error: "Boom",
289+
errorType: "unknown",
290+
});
291+
292+
expect(aggregator.hasInFlightStreams()).toBe(false);
293+
294+
const displayed = aggregator.getDisplayedMessages();
295+
const errorMsg = displayed.find((m) => m.type === "stream-error");
296+
expect(errorMsg).toBeDefined();
297+
if (errorMsg?.type === "stream-error") {
298+
expect(errorMsg.error).toBe("Boom");
299+
expect(errorMsg.errorType).toBe("unknown");
300+
}
301+
});
302+
251303
test("should reconstruct todos on reload ONLY when reconnecting to active stream", () => {
252304
const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT);
253305

src/browser/utils/messages/StreamingMessageAggregator.ts

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -699,9 +699,9 @@ export class StreamingMessageAggregator {
699699
handleStreamAbort(data: StreamAbortEvent): void {
700700
// Direct lookup by messageId
701701
const stream = this.inFlightStreams.get(data.messageId);
702-
const activeStream = stream?.phase === "active" ? stream.context : undefined;
702+
if (!stream) return;
703703

704-
if (activeStream) {
704+
if (stream.phase === "active") {
705705
// Mark the message as interrupted and merge metadata (consistent with handleStreamEnd)
706706
const message = this.messages.get(data.messageId);
707707
if (message?.metadata) {
@@ -714,14 +714,35 @@ export class StreamingMessageAggregator {
714714
// Compact parts even on abort - still reduces memory for partial messages
715715
this.compactMessageParts(message);
716716
}
717-
718-
// Clean up stream-scoped state (active stream tracking, TODOs)
719-
this.cleanupStreamState(data.messageId);
720-
this.invalidateCache();
721717
}
718+
719+
// Always clean up stream-scoped state (pending or active) to avoid wedging canInterrupt=true.
720+
this.cleanupStreamState(data.messageId);
721+
this.invalidateCache();
722722
}
723723

724724
handleStreamError(data: StreamErrorMessage): void {
725+
const createSyntheticErrorMessage = (): void => {
726+
// Get the highest historySequence from existing messages so this appears at the end.
727+
const maxSequence = Math.max(
728+
0,
729+
...Array.from(this.messages.values()).map((m) => m.metadata?.historySequence ?? 0)
730+
);
731+
const errorMessage: MuxMessage = {
732+
id: data.messageId,
733+
role: "assistant",
734+
parts: [],
735+
metadata: {
736+
partial: true,
737+
error: data.error,
738+
errorType: data.errorType,
739+
timestamp: Date.now(),
740+
historySequence: maxSequence + 1,
741+
},
742+
};
743+
this.messages.set(data.messageId, errorMessage);
744+
};
745+
725746
const isTrackedStream = this.inFlightStreams.has(data.messageId);
726747

727748
if (isTrackedStream) {
@@ -734,6 +755,9 @@ export class StreamingMessageAggregator {
734755

735756
// Compact parts even on error - still reduces memory for partial messages
736757
this.compactMessageParts(message);
758+
} else {
759+
// Stream errored before stream-start created a message (pending-phase).
760+
createSyntheticErrorMessage();
737761
}
738762

739763
// Clean up stream-scoped state (active/connecting tracking, TODOs)
@@ -744,24 +768,7 @@ export class StreamingMessageAggregator {
744768

745769
// Pre-stream error (e.g., API key not configured before streaming starts)
746770
// Create a synthetic error message since there's no tracked stream to attach to.
747-
// Get the highest historySequence from existing messages so this appears at the end.
748-
const maxSequence = Math.max(
749-
0,
750-
...Array.from(this.messages.values()).map((m) => m.metadata?.historySequence ?? 0)
751-
);
752-
const errorMessage: MuxMessage = {
753-
id: data.messageId,
754-
role: "assistant",
755-
parts: [],
756-
metadata: {
757-
partial: true,
758-
error: data.error,
759-
errorType: data.errorType,
760-
timestamp: Date.now(),
761-
historySequence: maxSequence + 1,
762-
},
763-
};
764-
this.messages.set(data.messageId, errorMessage);
771+
createSyntheticErrorMessage();
765772
this.invalidateCache();
766773
}
767774

0 commit comments

Comments
 (0)