Skip to content

Commit cf7ccbe

Browse files
committed
🤖 refactor: simplify stream lifecycle plumbing
1 parent 02af370 commit cf7ccbe

File tree

6 files changed

+93
-22
lines changed

6 files changed

+93
-22
lines changed

src/browser/utils/messages/StreamingMessageAggregator.ts

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -548,18 +548,6 @@ export class StreamingMessageAggregator {
548548
this.setPendingStreamStartTime(null);
549549

550550
this.connectingStreams.set(data.messageId, { startTime: Date.now(), model: data.model });
551-
552-
// Create a placeholder assistant message (kept invisible until parts arrive)
553-
// so that out-of-order deltas (if they ever occur) have somewhere to attach.
554-
if (!this.messages.has(data.messageId)) {
555-
const connectingMessage = createMuxMessage(data.messageId, "assistant", "", {
556-
historySequence: data.historySequence,
557-
timestamp: Date.now(),
558-
model: data.model,
559-
});
560-
this.messages.set(data.messageId, connectingMessage);
561-
}
562-
563551
this.invalidateCache();
564552
}
565553

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Stream lifecycle events are emitted during an in-flight assistant response.
2+
//
3+
// Keeping the event list centralized makes it harder to accidentally forget to forward/buffer a
4+
// newly introduced lifecycle event.
5+
6+
export const STREAM_LIFECYCLE_EVENTS = [
7+
"stream-pending",
8+
"stream-start",
9+
"stream-delta",
10+
"stream-abort",
11+
"stream-end",
12+
] as const;
13+
14+
export type StreamLifecycleEventName = (typeof STREAM_LIFECYCLE_EVENTS)[number];
15+
16+
// Events that can be forwarded 1:1 from StreamManager -> AIService.
17+
// (`stream-abort` needs additional bookkeeping in AIService.)
18+
export const STREAM_LIFECYCLE_EVENTS_DIRECT_FORWARD = [
19+
"stream-pending",
20+
"stream-start",
21+
"stream-delta",
22+
"stream-end",
23+
] as const satisfies readonly StreamLifecycleEventName[];
24+
25+
// Events that can be forwarded 1:1 from AIService -> AgentSession -> renderer.
26+
// (`stream-end` has additional session-side behavior.)
27+
export const STREAM_LIFECYCLE_EVENTS_SIMPLE_FORWARD = [
28+
"stream-pending",
29+
"stream-start",
30+
"stream-delta",
31+
"stream-abort",
32+
] as const satisfies readonly StreamLifecycleEventName[];
33+
34+
export function forwardStreamLifecycleEvents(params: {
35+
events: readonly StreamLifecycleEventName[];
36+
listen: (event: StreamLifecycleEventName, handler: (payload: unknown) => void) => void;
37+
emit: (event: StreamLifecycleEventName, payload: unknown) => void;
38+
}): void {
39+
for (const event of params.events) {
40+
params.listen(event, (payload) => params.emit(event, payload));
41+
}
42+
}

src/node/services/agentSession.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ import { AttachmentService } from "./attachmentService";
3535
import type { PostCompactionAttachment, PostCompactionExclusions } from "@/common/types/attachment";
3636
import { TURNS_BETWEEN_ATTACHMENTS } from "@/common/constants/attachments";
3737
import { extractEditedFileDiffs } from "@/common/utils/messages/extractEditedFiles";
38+
import {
39+
forwardStreamLifecycleEvents,
40+
STREAM_LIFECYCLE_EVENTS_SIMPLE_FORWARD,
41+
} from "@/common/utils/streamLifecycle";
3842
import { isValidModelFormat } from "@/common/utils/ai/models";
3943

4044
/**
@@ -619,9 +623,15 @@ export class AgentSession {
619623
this.aiService.on(event, wrapped as never);
620624
};
621625

622-
forward("stream-pending", (payload) => this.emitChatEvent(payload));
623-
forward("stream-start", (payload) => this.emitChatEvent(payload));
624-
forward("stream-delta", (payload) => this.emitChatEvent(payload));
626+
forwardStreamLifecycleEvents({
627+
events: STREAM_LIFECYCLE_EVENTS_SIMPLE_FORWARD,
628+
listen: (event, handler) => {
629+
forward(event, handler);
630+
},
631+
emit: (_event, payload) => {
632+
this.emitChatEvent(payload as WorkspaceChatMessage);
633+
},
634+
});
625635
forward("tool-call-start", (payload) => this.emitChatEvent(payload));
626636
forward("tool-call-delta", (payload) => this.emitChatEvent(payload));
627637
forward("tool-call-end", (payload) => {
@@ -644,7 +654,6 @@ export class AgentSession {
644654
forward("reasoning-delta", (payload) => this.emitChatEvent(payload));
645655
forward("reasoning-end", (payload) => this.emitChatEvent(payload));
646656
forward("usage-delta", (payload) => this.emitChatEvent(payload));
647-
forward("stream-abort", (payload) => this.emitChatEvent(payload));
648657

649658
forward("stream-end", async (payload) => {
650659
const handled = await this.compactionHandler.handleCompletion(payload as StreamEndEvent);

src/node/services/aiService.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ import { applyToolPolicy, type ToolPolicy } from "@/common/utils/tools/toolPolic
5858
import { MockScenarioPlayer } from "./mock/mockScenarioPlayer";
5959
import { EnvHttpProxyAgent, type Dispatcher } from "undici";
6060
import { getPlanFilePath } from "@/common/utils/planStorage";
61+
import {
62+
forwardStreamLifecycleEvents,
63+
STREAM_LIFECYCLE_EVENTS_DIRECT_FORWARD,
64+
} from "@/common/utils/streamLifecycle";
6165
import { getPlanModeInstruction } from "@/common/utils/ui/modeUtils";
6266
import type { UIMode } from "@/common/types/mode";
6367
import { MUX_APP_ATTRIBUTION_TITLE, MUX_APP_ATTRIBUTION_URL } from "@/constants/appAttribution";
@@ -348,10 +352,15 @@ export class AIService extends EventEmitter {
348352
* Forward all stream events from StreamManager to AIService consumers
349353
*/
350354
private setupStreamEventForwarding(): void {
351-
this.streamManager.on("stream-pending", (data) => this.emit("stream-pending", data));
352-
this.streamManager.on("stream-start", (data) => this.emit("stream-start", data));
353-
this.streamManager.on("stream-delta", (data) => this.emit("stream-delta", data));
354-
this.streamManager.on("stream-end", (data) => this.emit("stream-end", data));
355+
forwardStreamLifecycleEvents({
356+
events: STREAM_LIFECYCLE_EVENTS_DIRECT_FORWARD,
357+
listen: (event, handler) => {
358+
this.streamManager.on(event, handler);
359+
},
360+
emit: (event, payload) => {
361+
this.emit(event, payload);
362+
},
363+
});
355364

356365
// Handle stream-abort: dispose of partial based on abandonPartial flag
357366
this.streamManager.on("stream-abort", (data: StreamAbortEvent) => {
@@ -1405,6 +1414,7 @@ export class AIService extends EventEmitter {
14051414
// Delegate to StreamManager with model instance, system message, tools, historySequence, and initial metadata
14061415
const streamResult = await this.streamManager.startStream(
14071416
workspaceId,
1417+
assistantMessageId,
14081418
finalMessages,
14091419
modelResult.data,
14101420
modelString,

src/node/services/streamManager.test.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
8484
// Start first stream
8585
const result1 = await streamManager.startStream(
8686
workspaceId,
87+
"assistant-1",
8788
[{ role: "user", content: "Say hello and nothing else" }],
8889
model,
8990
KNOWN_MODELS.SONNET.id,
@@ -102,6 +103,7 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
102103
// Start second stream - should cancel first
103104
const result2 = await streamManager.startStream(
104105
workspaceId,
106+
"assistant-2",
105107
[{ role: "user", content: "Say goodbye and nothing else" }],
106108
model,
107109
KNOWN_MODELS.SONNET.id,
@@ -164,6 +166,11 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
164166
}
165167

166168
const ensureStreamSafetyValue = Reflect.get(streamManager, "ensureStreamSafety") as unknown;
169+
const pendingMessageIds: string[] = [];
170+
streamManager.on("stream-pending", (event) => {
171+
pendingMessageIds.push((event as { messageId: string }).messageId);
172+
});
173+
167174
if (typeof ensureStreamSafetyValue !== "function") {
168175
throw new Error("StreamManager.ensureStreamSafety is unavailable for testing");
169176
}
@@ -199,7 +206,10 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
199206
"createStreamAtomically",
200207
(
201208
wsId: string,
209+
assistantMessageId: string,
202210
streamToken: string,
211+
_runtimeTempDir: string,
212+
_runtime: unknown,
203213
messages: unknown,
204214
modelArg: unknown,
205215
modelString: string,
@@ -228,7 +238,7 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
228238
providerMetadata: Promise.resolve(undefined),
229239
},
230240
abortController,
231-
messageId: `test-${Math.random().toString(36).slice(2)}`,
241+
messageId: assistantMessageId,
232242
token: streamToken,
233243
startTime: Date.now(),
234244
model: modelString,
@@ -274,6 +284,7 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
274284
const promises = [
275285
streamManager.startStream(
276286
workspaceId,
287+
"assistant-mutex-1",
277288
[{ role: "user", content: "test 1" }],
278289
model,
279290
KNOWN_MODELS.SONNET.id,
@@ -285,6 +296,7 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
285296
),
286297
streamManager.startStream(
287298
workspaceId,
299+
"assistant-mutex-2",
288300
[{ role: "user", content: "test 2" }],
289301
model,
290302
KNOWN_MODELS.SONNET.id,
@@ -296,6 +308,7 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
296308
),
297309
streamManager.startStream(
298310
workspaceId,
311+
"assistant-mutex-3",
299312
[{ role: "user", content: "test 3" }],
300313
model,
301314
KNOWN_MODELS.SONNET.id,
@@ -317,6 +330,12 @@ describe("StreamManager - Concurrent Stream Prevention", () => {
317330
expect(ensureOperations[i]).toBe("ensure-start");
318331
expect(ensureOperations[i + 1]).toBe("ensure-end");
319332
}
333+
334+
expect(pendingMessageIds).toEqual([
335+
"assistant-mutex-1",
336+
"assistant-mutex-2",
337+
"assistant-mutex-3",
338+
]);
320339
});
321340
});
322341

src/node/services/streamManager.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,7 @@ export class StreamManager extends EventEmitter {
600600
*/
601601
private createStreamAtomically(
602602
workspaceId: WorkspaceId,
603+
assistantMessageId: string,
603604
streamToken: StreamToken,
604605
runtimeTempDir: string,
605606
runtime: Runtime,
@@ -684,7 +685,7 @@ export class StreamManager extends EventEmitter {
684685
throw error;
685686
}
686687

687-
const messageId = `assistant-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
688+
const messageId = assistantMessageId;
688689

689690
const streamInfo: WorkspaceStreamInfo = {
690691
state: StreamState.STARTING,
@@ -1380,6 +1381,7 @@ export class StreamManager extends EventEmitter {
13801381
*/
13811382
async startStream(
13821383
workspaceId: string,
1384+
assistantMessageId: string,
13831385
messages: ModelMessage[],
13841386
model: LanguageModel,
13851387
modelString: string,
@@ -1426,6 +1428,7 @@ export class StreamManager extends EventEmitter {
14261428
// Step 4: Atomic stream creation and registration
14271429
const streamInfo = this.createStreamAtomically(
14281430
typedWorkspaceId,
1431+
assistantMessageId,
14291432
streamToken,
14301433
runtimeTempDir,
14311434
runtime,

0 commit comments

Comments
 (0)