Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions packages/socket.io-adapter/lib/in-memory-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,20 @@ export class Adapter extends EventEmitter {
clientCountCallback(clientCount);
}

/**
* Removes the acknowledgement callback for the given packet ID from all
* matching sockets. This is used to clean up pending acks when a
* broadcastWithAck operation times out, preventing memory leaks.
*
* @param packetId - the packet ID whose acks should be removed
* @param opts - the same broadcast options used in broadcastWithAck
*/
public removeAcks(packetId: number, opts: BroadcastOptions): void {
this.apply(opts, (socket) => {
socket.acks.delete(packetId);
});
}

private _encode(packet: unknown, packetOpts: Record<string, unknown>) {
const encodedPackets = this.encoder.encode(packet);

Expand Down
20 changes: 14 additions & 6 deletions packages/socket.io/lib/broadcast-operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
}
// set up packet object
const data = [ev, ...args];
const packet = {
const packet: { type: PacketType; data: any[]; id?: number } = {
type: PacketType.EVENT,
data: data,
};
Expand All @@ -233,8 +233,20 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
let timedOut = false;
let responses: any[] = [];

const broadcastOpts = {
rooms: this.rooms,
except: this.exceptRooms,
flags: this.flags,
};

const timer = setTimeout(() => {
timedOut = true;
// Clean up pending ack entries from sockets to prevent memory leaks.
// Without this, socket.acks retains references to the callback forever
// when clients never respond before the timeout fires.
if (typeof this.adapter.removeAcks === "function") {
this.adapter.removeAcks(packet.id!, broadcastOpts);
}
ack.apply(this, [
new Error("operation has timed out"),
this.flags.expectSingleResponse ? null : responses,
Expand All @@ -261,11 +273,7 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>

this.adapter.broadcastWithAck(
packet,
{
rooms: this.rooms,
except: this.exceptRooms,
flags: this.flags,
},
broadcastOpts,
(clientCount) => {
// each Socket.IO server in the cluster sends the number of clients that were notified
expectedClientCount += clientCount;
Expand Down
34 changes: 34 additions & 0 deletions packages/socket.io/test/messaging-many.ts
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,40 @@ describe("messaging many", () => {
});
});

it("should clean up socket.acks after emitWithAck timeout (memory leak fix)", (done) => {
const io = new Server(0);
const socket1 = createClient(io, "/", { multiplex: false });
const socket2 = createClient(io, "/", { multiplex: false });

// socket1 responds, socket2 never responds (simulates timeout)
socket1.on("some event", (cb) => {
cb(1);
});

socket2.on("some event", () => {
// intentionally do not call callback to trigger timeout
});

Promise.all([
waitFor(socket1, "connect"),
waitFor(socket2, "connect"),
]).then(async () => {
try {
await io.timeout(200).emitWithAck("some event");
expect().fail();
} catch (err) {
// After timeout, all server-side socket ack maps should be cleaned up.
// This verifies the fix for issue #4984 (memory leak in emitWithAck).
for (const [, serverSocket] of io.of("/").sockets) {
// @ts-ignore accessing private acks map to verify cleanup
expect(serverSocket.acks.size).to.be(0);
}

success(done, io, socket1, socket2);
}
});
});

it("should precompute the WebSocket frame when broadcasting", (done) => {
const io = new Server(0);
const socket = createClient(io, "/chat", {
Expand Down