JS: Add async support for resolving track aliases.#953
JS: Add async support for resolving track aliases.#953
Conversation
WalkthroughThe subscriber module refactors its internal subscription tracking to handle asynchronous track resolution. The Track import is changed from type-only to a concrete value import. The subscriptions storage is restructured from a simple requestId-to-Track mapping to a requestId-to-object mapping containing both the Track producer and an in-flight PromiseWithResolvers for SubscribeOk. The trackAliases mapping is updated to store either a Track or a pending PromiseWithResolvers, allowing deferred resolution. The subscribe flow initializes an ok-promise alongside each subscription, subscribe acknowledgements are resolved via this promise, and incoming Group messages trigger resolution of deferred aliases. Unsubscribe cleanup is adjusted to remove alias entries, and error handling is refactored to operate on the new per-request structure. The legacy subscribeCallbacks map is removed. 🚥 Pre-merge checks | ✅ 2 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@js/lite/src/ietf/subscriber.ts`:
- Around line 293-314: The timeout cleanup can delete a Track set later by
handleSubscribeOk because it unconditionally deletes this.#trackAliases entry;
fix by capturing the original unresolved promise (the value returned by
Promise.withResolvers<Track>() assigned to track before awaiting) and, when
timing out, only delete the map entry if
this.#trackAliases.get(group.trackAlias) === that original promise; keep
throwing the timeout error as before but avoid removing a newly-stored Track
from handleSubscribeOk.
🧹 Nitpick comments (2)
js/lite/src/ietf/subscriber.ts (2)
216-222: Inconsistent type-narrowing pattern — preferinstanceof Trackfor clarity and consistency.Line 299 uses
track instanceof Trackto distinguishTrackfromPromiseWithResolvers<Track>. Here, a duck-type check ("resolve" in pending) is used for the same logical distinction. Aligning both checks makes the code easier to follow and less fragile if either type evolves.♻️ Suggested change
- const pending = this.#trackAliases.get(ok.trackAlias); - if (pending && typeof pending === "object" && "resolve" in pending) { + const pending = this.#trackAliases.get(ok.trackAlias); + if (pending && !(pending instanceof Track)) {
304-306: Magic number: extract1000ms timeout to a named constant.Per coding guidelines, avoid magic numbers. This timeout governs a protocol-level race and deserves a descriptive name.
♻️ Suggested change
Add near the top of the class (or as a module-level constant):
/** Maximum time (ms) to wait for a SUBSCRIBE_OK to resolve a track alias. */ const TRACK_ALIAS_TIMEOUT_MS = 1000;Then reference it:
- cancel = setTimeout(() => resolve(undefined), 1000); + cancel = setTimeout(() => resolve(undefined), TRACK_ALIAS_TIMEOUT_MS);As per coding guidelines, "Avoid using magic numbers; use named constants instead".
| let track = this.#trackAliases.get(group.trackAlias); | ||
| if (!track) { | ||
| track = Promise.withResolvers<Track>(); | ||
| this.#trackAliases.set(group.trackAlias, track); | ||
| } | ||
|
|
||
| const track = this.#subscribes.get(requestId); | ||
| if (!track) { | ||
| throw new Error( | ||
| `unknown track: trackAlias=${group.trackAlias} requestId=${this.#trackAliases.get(group.trackAlias)}`, | ||
| ); | ||
| if (!(track instanceof Track)) { | ||
| // Because moq-transport is DUMB, we need to wait for the track alias to be resolved. | ||
| // This is only known from the SUBSCRIBE_OK message, which will lose the race at least half the time. | ||
| // We can't block forever because this could be a late data stream too. | ||
| let cancel!: ReturnType<typeof setTimeout>; | ||
| const timeout = new Promise<Track | undefined>((resolve) => { | ||
| cancel = setTimeout(() => resolve(undefined), 1000); | ||
| }); | ||
| track = await Promise.race([track.promise, timeout]); | ||
| clearTimeout(cancel); | ||
|
|
||
| if (track === undefined) { | ||
| // Clean up after ourselves if we timed out looking up the track alias | ||
| this.#trackAliases.delete(group.trackAlias); | ||
| throw new Error(`timeout waiting for track alias: ${group.trackAlias}`); | ||
| } |
There was a problem hiding this comment.
Race condition: timeout cleanup can delete a valid Track entry set by a concurrent handleSubscribeOk.
If two handleGroup calls race on the same unresolved alias, the first timeout deletes the map entry at line 312. When handleSubscribeOk arrives afterwards, it stores the Track at line 222. But the second handleGroup's promise is already orphaned (never resolved), so its timeout also fires and deletes the freshly-stored Track—breaking all subsequent groups for that alias.
Guard the deletion so it only removes the entry it owns:
🐛 Proposed fix
+ // Keep a reference to the deferred we created / obtained so we can guard cleanup
+ const deferred = track;
+
if (!(track instanceof Track)) {
let cancel!: ReturnType<typeof setTimeout>;
const timeout = new Promise<Track | undefined>((resolve) => {
cancel = setTimeout(() => resolve(undefined), 1000);
});
track = await Promise.race([track.promise, timeout]);
clearTimeout(cancel);
if (track === undefined) {
- this.#trackAliases.delete(group.trackAlias);
+ // Only clean up if the map still holds our deferred; another path may have replaced it.
+ if (this.#trackAliases.get(group.trackAlias) === deferred) {
+ this.#trackAliases.delete(group.trackAlias);
+ }
throw new Error(`timeout waiting for track alias: ${group.trackAlias}`);
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let track = this.#trackAliases.get(group.trackAlias); | |
| if (!track) { | |
| track = Promise.withResolvers<Track>(); | |
| this.#trackAliases.set(group.trackAlias, track); | |
| } | |
| const track = this.#subscribes.get(requestId); | |
| if (!track) { | |
| throw new Error( | |
| `unknown track: trackAlias=${group.trackAlias} requestId=${this.#trackAliases.get(group.trackAlias)}`, | |
| ); | |
| if (!(track instanceof Track)) { | |
| // Because moq-transport is DUMB, we need to wait for the track alias to be resolved. | |
| // This is only known from the SUBSCRIBE_OK message, which will lose the race at least half the time. | |
| // We can't block forever because this could be a late data stream too. | |
| let cancel!: ReturnType<typeof setTimeout>; | |
| const timeout = new Promise<Track | undefined>((resolve) => { | |
| cancel = setTimeout(() => resolve(undefined), 1000); | |
| }); | |
| track = await Promise.race([track.promise, timeout]); | |
| clearTimeout(cancel); | |
| if (track === undefined) { | |
| // Clean up after ourselves if we timed out looking up the track alias | |
| this.#trackAliases.delete(group.trackAlias); | |
| throw new Error(`timeout waiting for track alias: ${group.trackAlias}`); | |
| } | |
| let track = this.#trackAliases.get(group.trackAlias); | |
| if (!track) { | |
| track = Promise.withResolvers<Track>(); | |
| this.#trackAliases.set(group.trackAlias, track); | |
| } | |
| // Keep a reference to the deferred we created / obtained so we can guard cleanup | |
| const deferred = track; | |
| if (!(track instanceof Track)) { | |
| // Because moq-transport is DUMB, we need to wait for the track alias to be resolved. | |
| // This is only known from the SUBSCRIBE_OK message, which will lose the race at least half the time. | |
| // We can't block forever because this could be a late data stream too. | |
| let cancel!: ReturnType<typeof setTimeout>; | |
| const timeout = new Promise<Track | undefined>((resolve) => { | |
| cancel = setTimeout(() => resolve(undefined), 1000); | |
| }); | |
| track = await Promise.race([track.promise, timeout]); | |
| clearTimeout(cancel); | |
| if (track === undefined) { | |
| // Only clean up if the map still holds our deferred; another path may have replaced it. | |
| if (this.#trackAliases.get(group.trackAlias) === deferred) { | |
| this.#trackAliases.delete(group.trackAlias); | |
| } | |
| throw new Error(`timeout waiting for track alias: ${group.trackAlias}`); | |
| } |
🤖 Prompt for AI Agents
In `@js/lite/src/ietf/subscriber.ts` around lines 293 - 314, The timeout cleanup
can delete a Track set later by handleSubscribeOk because it unconditionally
deletes this.#trackAliases entry; fix by capturing the original unresolved
promise (the value returned by Promise.withResolvers<Track>() assigned to track
before awaiting) and, when timing out, only delete the map entry if
this.#trackAliases.get(group.trackAlias) === that original promise; keep
throwing the timeout error as before but avoid removing a newly-stored Track
from handleSubscribeOk.
|
Back to a draft because I don't actually want to interop test this. |
Track alias is the dumbest design decision.