From 004e8b6c7d1f462ea61984050a9a51b98bcdf9b4 Mon Sep 17 00:00:00 2001 From: cobyfrombrooklyn-bot Date: Mon, 23 Feb 2026 04:43:35 -0500 Subject: [PATCH] fix: clean up socket.acks on emitWithAck timeout to prevent memory leak When using emitWithAck with a timeout, if clients don't respond before the timeout fires, the ack callbacks stored in socket.acks are never cleaned up. Over time this causes unbounded memory growth proportional to the number of timed-out broadcasts. This adds a removeAcks() method to the in-memory adapter that iterates matching sockets and deletes the orphaned ack entry. The broadcast operator calls it when the timeout fires, before invoking the error callback. Fixes #4984 --- .../lib/in-memory-adapter.ts | 14 ++++++++ packages/socket.io/lib/broadcast-operator.ts | 20 +++++++---- packages/socket.io/test/messaging-many.ts | 34 +++++++++++++++++++ 3 files changed, 62 insertions(+), 6 deletions(-) diff --git a/packages/socket.io-adapter/lib/in-memory-adapter.ts b/packages/socket.io-adapter/lib/in-memory-adapter.ts index cf178170e0..e8fec4de00 100644 --- a/packages/socket.io-adapter/lib/in-memory-adapter.ts +++ b/packages/socket.io-adapter/lib/in-memory-adapter.ts @@ -231,6 +231,20 @@ export class Adapter extends EventEmitter { clientCountCallback(clientCount); } + /** + * Removes the acknowledgement callback for the given packet ID from all + * matching sockets. This is used to clean up pending acks when a + * broadcastWithAck operation times out, preventing memory leaks. + * + * @param packetId - the packet ID whose acks should be removed + * @param opts - the same broadcast options used in broadcastWithAck + */ + public removeAcks(packetId: number, opts: BroadcastOptions): void { + this.apply(opts, (socket) => { + socket.acks.delete(packetId); + }); + } + private _encode(packet: unknown, packetOpts: Record) { const encodedPackets = this.encoder.encode(packet); diff --git a/packages/socket.io/lib/broadcast-operator.ts b/packages/socket.io/lib/broadcast-operator.ts index 6a229daef6..b7f7fda34f 100644 --- a/packages/socket.io/lib/broadcast-operator.ts +++ b/packages/socket.io/lib/broadcast-operator.ts @@ -212,7 +212,7 @@ export class BroadcastOperator } // set up packet object const data = [ev, ...args]; - const packet = { + const packet: { type: PacketType; data: any[]; id?: number } = { type: PacketType.EVENT, data: data, }; @@ -233,8 +233,20 @@ export class BroadcastOperator let timedOut = false; let responses: any[] = []; + const broadcastOpts = { + rooms: this.rooms, + except: this.exceptRooms, + flags: this.flags, + }; + const timer = setTimeout(() => { timedOut = true; + // Clean up pending ack entries from sockets to prevent memory leaks. + // Without this, socket.acks retains references to the callback forever + // when clients never respond before the timeout fires. + if (typeof this.adapter.removeAcks === "function") { + this.adapter.removeAcks(packet.id!, broadcastOpts); + } ack.apply(this, [ new Error("operation has timed out"), this.flags.expectSingleResponse ? null : responses, @@ -261,11 +273,7 @@ export class BroadcastOperator this.adapter.broadcastWithAck( packet, - { - rooms: this.rooms, - except: this.exceptRooms, - flags: this.flags, - }, + broadcastOpts, (clientCount) => { // each Socket.IO server in the cluster sends the number of clients that were notified expectedClientCount += clientCount; diff --git a/packages/socket.io/test/messaging-many.ts b/packages/socket.io/test/messaging-many.ts index 0a51815068..223130f438 100644 --- a/packages/socket.io/test/messaging-many.ts +++ b/packages/socket.io/test/messaging-many.ts @@ -567,6 +567,40 @@ describe("messaging many", () => { }); }); + it("should clean up socket.acks after emitWithAck timeout (memory leak fix)", (done) => { + const io = new Server(0); + const socket1 = createClient(io, "/", { multiplex: false }); + const socket2 = createClient(io, "/", { multiplex: false }); + + // socket1 responds, socket2 never responds (simulates timeout) + socket1.on("some event", (cb) => { + cb(1); + }); + + socket2.on("some event", () => { + // intentionally do not call callback to trigger timeout + }); + + Promise.all([ + waitFor(socket1, "connect"), + waitFor(socket2, "connect"), + ]).then(async () => { + try { + await io.timeout(200).emitWithAck("some event"); + expect().fail(); + } catch (err) { + // After timeout, all server-side socket ack maps should be cleaned up. + // This verifies the fix for issue #4984 (memory leak in emitWithAck). + for (const [, serverSocket] of io.of("/").sockets) { + // @ts-ignore accessing private acks map to verify cleanup + expect(serverSocket.acks.size).to.be(0); + } + + success(done, io, socket1, socket2); + } + }); + }); + it("should precompute the WebSocket frame when broadcasting", (done) => { const io = new Server(0); const socket = createClient(io, "/chat", {