-
Notifications
You must be signed in to change notification settings - Fork 201
feat: Waiting plugin addition #1110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
sunitaprajapati89
wants to merge
3
commits into
master
Choose a base branch
from
waiting-plugin
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
46 changes: 46 additions & 0 deletions
46
examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| import {WaitingPlugin, PluginType} from '@segment/analytics-react-native'; | ||
|
|
||
| import type { | ||
| SegmentAPISettings, | ||
| SegmentEvent, | ||
| UpdateType, | ||
| } from '@segment/analytics-react-native'; | ||
|
|
||
| /** | ||
| * Example WaitingPlugin that demonstrates how to pause event processing | ||
| * until an async operation completes. | ||
| * | ||
| * Use cases: | ||
| * - Waiting for IDFA/advertising ID permissions | ||
| * - Initializing native SDKs or modules | ||
| * - Loading required configuration from remote sources | ||
| * | ||
| * The plugin automatically pauses event processing when added to the client. | ||
| * Call resume() when your async operation completes to start processing events. | ||
| */ | ||
| export class ExampleWaitingPlugin extends WaitingPlugin { | ||
| type = PluginType.enrichment; | ||
| tracked = false; | ||
|
|
||
| /** | ||
| * Called when settings are updated from Segment. | ||
| * For initial settings, we simulate an async operation and then resume. | ||
| */ | ||
| update(_settings: SegmentAPISettings, type: UpdateType) { | ||
| if (type === UpdateType.initial) { | ||
| // Simulate async work (e.g., requesting permissions, loading data) | ||
| setTimeout(() => { | ||
| // Resume event processing once async work is complete | ||
| this.resume(); | ||
| }, 3000); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Called for track events | ||
| */ | ||
| track(event: SegmentEvent) { | ||
| this.tracked = true; | ||
| return event; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -72,6 +72,7 @@ import { | |
| translateHTTPError, | ||
| } from './errors'; | ||
| import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin'; | ||
| import { WaitingPlugin } from './plugin'; | ||
|
|
||
| type OnPluginAddedCallback = (plugin: Plugin) => void; | ||
|
|
||
|
|
@@ -120,6 +121,10 @@ export class SegmentClient { | |
| * Access or subscribe to client enabled | ||
| */ | ||
| readonly enabled: Watchable<boolean> & Settable<boolean>; | ||
| /** | ||
| * Access or subscribe to running state (controls event processing) | ||
| */ | ||
| readonly running: Watchable<boolean> & Settable<boolean>; | ||
| /** | ||
| * Access or subscribe to client context | ||
| */ | ||
|
|
@@ -258,6 +263,12 @@ export class SegmentClient { | |
| onChange: this.store.enabled.onChange, | ||
| }; | ||
|
|
||
| this.running = { | ||
| get: this.store.running.get, | ||
| set: this.store.running.set, | ||
| onChange: this.store.running.onChange, | ||
| }; | ||
|
|
||
| // add segment destination plugin unless | ||
| // asked not to via configuration. | ||
| if (this.config.autoAddSegmentDestination === true) { | ||
|
|
@@ -295,7 +306,6 @@ export class SegmentClient { | |
| if ((await this.store.isReady.get(true)) === false) { | ||
| await this.storageReady(); | ||
| } | ||
|
|
||
| // Get new settings from segment | ||
| // It's important to run this before checkInstalledVersion and trackDeeplinks to give time for destination plugins | ||
| // which make use of the settings object to initialize | ||
|
|
@@ -309,7 +319,8 @@ export class SegmentClient { | |
| ]); | ||
| await this.onReady(); | ||
| this.isReady.value = true; | ||
|
|
||
| // Set running to true to start event processing | ||
| await this.store.running.set(true); | ||
| // Process all pending events | ||
| await this.processPendingEvents(); | ||
| // Trigger manual flush | ||
|
|
@@ -465,7 +476,6 @@ export class SegmentClient { | |
| settings | ||
| ); | ||
| } | ||
|
|
||
| if (!this.isReady.value) { | ||
| this.pluginsToAdd.push(plugin); | ||
| } else { | ||
|
|
@@ -476,6 +486,11 @@ export class SegmentClient { | |
| private addPlugin(plugin: Plugin) { | ||
| plugin.configure(this); | ||
| this.timeline.add(plugin); | ||
| //check for waiting plugin here | ||
| if (plugin instanceof WaitingPlugin) { | ||
| this.pauseEventProcessingForPlugin(plugin); | ||
| } | ||
|
|
||
| this.triggerOnPluginLoaded(plugin); | ||
| } | ||
|
|
||
|
|
@@ -494,6 +509,11 @@ export class SegmentClient { | |
| if (this.enabled.get() === false) { | ||
| return; | ||
| } | ||
| if (!this.running.get()) { | ||
| // If not running, queue the event for later processing | ||
| await this.store.pendingEvents.add(event); | ||
| return event; | ||
| } | ||
| if (this.isReady.value) { | ||
| return this.startTimelineProcessing(event); | ||
| } else { | ||
|
|
@@ -512,7 +532,7 @@ export class SegmentClient { | |
| ): Promise<SegmentEvent | undefined> { | ||
| const event = await this.applyContextData(incomingEvent); | ||
| this.flushPolicyExecuter.notify(event); | ||
| return this.timeline.process(event); | ||
| return await this.timeline.process(event); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. await is redundant here |
||
| } | ||
|
|
||
| private async trackDeepLinks() { | ||
|
|
@@ -1027,4 +1047,82 @@ export class SegmentClient { | |
|
|
||
| return totalEventsCount; | ||
| } | ||
| private resumeTimeoutId?: ReturnType<typeof setTimeout>; | ||
abueide marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private waitingPlugins = new Set<WaitingPlugin>(); | ||
|
|
||
| /** | ||
| * Pause event processing for a specific WaitingPlugin. | ||
| * Events will be buffered until all waiting plugins resume. | ||
| * | ||
| * @param plugin - The WaitingPlugin requesting the pause | ||
| * @internal This is called automatically when a WaitingPlugin is added | ||
| */ | ||
| pauseEventProcessingForPlugin(plugin?: WaitingPlugin) { | ||
| if (plugin) { | ||
| this.waitingPlugins.add(plugin); | ||
| } | ||
| this.pauseEventProcessing(); | ||
| } | ||
|
|
||
| /** | ||
| * Resume event processing for a specific WaitingPlugin. | ||
| * If all waiting plugins have resumed, buffered events will be processed. | ||
| * | ||
| * @param plugin - The WaitingPlugin that has completed its async work | ||
| * @internal This is called automatically when a WaitingPlugin calls resume() | ||
| */ | ||
| async resumeEventProcessingForPlugin(plugin?: WaitingPlugin) { | ||
| if (plugin) { | ||
| this.waitingPlugins.delete(plugin); | ||
| } | ||
| if (this.waitingPlugins.size > 0) { | ||
| return; // still blocked by other waiting plugins | ||
| } | ||
|
|
||
| await this.resumeEventProcessing(); | ||
| } | ||
|
|
||
| /** | ||
| * Pause event processing globally. | ||
| * New events will be buffered in memory until resumeEventProcessing() is called. | ||
| * Automatically resumes after the specified timeout to prevent permanent blocking. | ||
| * | ||
| * @param timeout - Milliseconds to wait before auto-resuming (default: 30000) | ||
| */ | ||
| pauseEventProcessing(timeout = 30000) { | ||
| // IMPORTANT: ignore repeated pauses | ||
| const running = this.store.running.get(); | ||
| if (!running) { | ||
| return; | ||
| } | ||
|
|
||
| // Fire-and-forget: state is updated synchronously in-memory, persistence happens async | ||
| void this.store.running.set(false); | ||
|
|
||
| // Only set timeout if not already set (prevents multiple waiting plugins from overwriting) | ||
| if (!this.resumeTimeoutId) { | ||
| this.resumeTimeoutId = setTimeout(async () => { | ||
| await this.resumeEventProcessing(); | ||
| }, timeout); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Resume event processing and process all buffered events. | ||
| * This is called automatically by WaitingPlugins when they complete, | ||
| * or after the timeout expires. | ||
| */ | ||
| async resumeEventProcessing() { | ||
| const running = this.store.running.get(); | ||
| if (running) { | ||
| return; | ||
| } | ||
|
|
||
| if (this.resumeTimeoutId) { | ||
| clearTimeout(this.resumeTimeoutId); | ||
| this.resumeTimeoutId = undefined; | ||
| } | ||
| await this.store.running.set(true); | ||
| await this.processPendingEvents(); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,6 +73,13 @@ export class QueueFlushingPlugin extends UtilityPlugin { | |
| * Calls the onFlush callback with the events in the queue | ||
| */ | ||
| async flush() { | ||
| // Check if event processing is running | ||
| const running = this.analytics?.running.get(); | ||
| if (running === false) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a change in behavior. pipeline should still be able to flush. |
||
| this.analytics?.logger.info('Event processing is paused, skipping flush'); | ||
| return; | ||
| } | ||
|
|
||
| // Wait for the queue to be restored | ||
| try { | ||
| await this.isRestored; | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to be redundant with the if block in line 520