Skip to content

Commit 342c9fc

Browse files
committed
fixed tests and removed the run number incrementor from the run engine trigger pipeline
1 parent 3ed008d commit 342c9fc

File tree

8 files changed

+128
-138
lines changed

8 files changed

+128
-138
lines changed

apps/webapp/app/runEngine/concerns/runNumbers.server.ts

Lines changed: 0 additions & 14 deletions
This file was deleted.

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 83 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import { IdempotencyKeyConcern } from "../concerns/idempotencyKeys.server";
3434
import type {
3535
PayloadProcessor,
3636
QueueManager,
37-
RunNumberIncrementer,
3837
TraceEventConcern,
3938
TriggerRacepoints,
4039
TriggerRacepointSystem,
@@ -54,7 +53,6 @@ export class RunEngineTriggerTaskService {
5453
private readonly validator: TriggerTaskValidator;
5554
private readonly payloadProcessor: PayloadProcessor;
5655
private readonly idempotencyKeyConcern: IdempotencyKeyConcern;
57-
private readonly runNumberIncrementer: RunNumberIncrementer;
5856
private readonly prisma: PrismaClientOrTransaction;
5957
private readonly engine: RunEngine;
6058
private readonly tracer: Tracer;
@@ -69,7 +67,6 @@ export class RunEngineTriggerTaskService {
6967
validator: TriggerTaskValidator;
7068
payloadProcessor: PayloadProcessor;
7169
idempotencyKeyConcern: IdempotencyKeyConcern;
72-
runNumberIncrementer: RunNumberIncrementer;
7370
traceEventConcern: TraceEventConcern;
7471
tracer: Tracer;
7572
metadataMaximumSize: number;
@@ -81,7 +78,6 @@ export class RunEngineTriggerTaskService {
8178
this.validator = opts.validator;
8279
this.payloadProcessor = opts.payloadProcessor;
8380
this.idempotencyKeyConcern = opts.idempotencyKeyConcern;
84-
this.runNumberIncrementer = opts.runNumberIncrementer;
8581
this.tracer = opts.tracer;
8682
this.traceEventConcern = opts.traceEventConcern;
8783
this.metadataMaximumSize = opts.metadataMaximumSize;
@@ -271,97 +267,91 @@ export class RunEngineTriggerTaskService {
271267
triggerRequest,
272268
parentRun?.taskEventStore,
273269
async (event, store) => {
274-
const result = await this.runNumberIncrementer.incrementRunNumber(
275-
triggerRequest,
276-
async (num) => {
277-
event.setAttribute("queueName", queueName);
278-
span.setAttribute("queueName", queueName);
279-
event.setAttribute("runId", runFriendlyId);
280-
span.setAttribute("runId", runFriendlyId);
281-
282-
const payloadPacket = await this.payloadProcessor.process(triggerRequest);
283-
284-
const taskRun = await this.engine.trigger(
285-
{
286-
number: num,
287-
friendlyId: runFriendlyId,
288-
environment: environment,
289-
idempotencyKey,
290-
idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined,
291-
taskIdentifier: taskId,
292-
payload: payloadPacket.data ?? "",
293-
payloadType: payloadPacket.dataType,
294-
context: body.context,
295-
traceContext: this.#propagateExternalTraceContext(
296-
event.traceContext,
297-
parentRun?.traceContext,
298-
event.traceparent?.spanId
299-
),
300-
traceId: event.traceId,
301-
spanId: event.spanId,
302-
parentSpanId:
303-
options.parentAsLinkType === "replay" ? undefined : event.traceparent?.spanId,
304-
replayedFromTaskRunFriendlyId: options.replayedFromTaskRunFriendlyId,
305-
lockedToVersionId: lockedToBackgroundWorker?.id,
306-
taskVersion: lockedToBackgroundWorker?.version,
307-
sdkVersion: lockedToBackgroundWorker?.sdkVersion,
308-
cliVersion: lockedToBackgroundWorker?.cliVersion,
309-
concurrencyKey: body.options?.concurrencyKey,
310-
queue: queueName,
311-
lockedQueueId,
312-
workerQueue,
313-
isTest: body.options?.test ?? false,
314-
delayUntil,
315-
queuedAt: delayUntil ? undefined : new Date(),
316-
maxAttempts: body.options?.maxAttempts,
317-
taskEventStore: store,
318-
ttl,
319-
tags,
320-
oneTimeUseToken: options.oneTimeUseToken,
321-
parentTaskRunId: parentRun?.id,
322-
rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id,
323-
batch: options?.batchId
324-
? {
325-
id: options.batchId,
326-
index: options.batchIndex ?? 0,
327-
}
328-
: undefined,
329-
resumeParentOnCompletion: body.options?.resumeParentOnCompletion,
330-
depth,
331-
metadata: metadataPacket?.data,
332-
metadataType: metadataPacket?.dataType,
333-
seedMetadata: metadataPacket?.data,
334-
seedMetadataType: metadataPacket?.dataType,
335-
maxDurationInSeconds: body.options?.maxDuration
336-
? clampMaxDuration(body.options.maxDuration)
337-
: undefined,
338-
machine: body.options?.machine,
339-
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
340-
queueTimestamp:
341-
options.queueTimestamp ??
342-
(parentRun && body.options?.resumeParentOnCompletion
343-
? parentRun.queueTimestamp ?? undefined
344-
: undefined),
345-
scheduleId: options.scheduleId,
346-
scheduleInstanceId: options.scheduleInstanceId,
347-
createdAt: options.overrideCreatedAt,
348-
bulkActionId: body.options?.bulkActionId,
349-
planType,
350-
realtimeStreamsVersion: options.realtimeStreamsVersion,
351-
},
352-
this.prisma
353-
);
354-
355-
const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined;
356-
357-
if (error) {
358-
event.failWithError(error);
359-
}
360-
361-
return { run: taskRun, error, isCached: false };
362-
}
270+
event.setAttribute("queueName", queueName);
271+
span.setAttribute("queueName", queueName);
272+
event.setAttribute("runId", runFriendlyId);
273+
span.setAttribute("runId", runFriendlyId);
274+
275+
const payloadPacket = await this.payloadProcessor.process(triggerRequest);
276+
277+
const taskRun = await this.engine.trigger(
278+
{
279+
friendlyId: runFriendlyId,
280+
environment: environment,
281+
idempotencyKey,
282+
idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined,
283+
taskIdentifier: taskId,
284+
payload: payloadPacket.data ?? "",
285+
payloadType: payloadPacket.dataType,
286+
context: body.context,
287+
traceContext: this.#propagateExternalTraceContext(
288+
event.traceContext,
289+
parentRun?.traceContext,
290+
event.traceparent?.spanId
291+
),
292+
traceId: event.traceId,
293+
spanId: event.spanId,
294+
parentSpanId:
295+
options.parentAsLinkType === "replay" ? undefined : event.traceparent?.spanId,
296+
replayedFromTaskRunFriendlyId: options.replayedFromTaskRunFriendlyId,
297+
lockedToVersionId: lockedToBackgroundWorker?.id,
298+
taskVersion: lockedToBackgroundWorker?.version,
299+
sdkVersion: lockedToBackgroundWorker?.sdkVersion,
300+
cliVersion: lockedToBackgroundWorker?.cliVersion,
301+
concurrencyKey: body.options?.concurrencyKey,
302+
queue: queueName,
303+
lockedQueueId,
304+
workerQueue,
305+
isTest: body.options?.test ?? false,
306+
delayUntil,
307+
queuedAt: delayUntil ? undefined : new Date(),
308+
maxAttempts: body.options?.maxAttempts,
309+
taskEventStore: store,
310+
ttl,
311+
tags,
312+
oneTimeUseToken: options.oneTimeUseToken,
313+
parentTaskRunId: parentRun?.id,
314+
rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id,
315+
batch: options?.batchId
316+
? {
317+
id: options.batchId,
318+
index: options.batchIndex ?? 0,
319+
}
320+
: undefined,
321+
resumeParentOnCompletion: body.options?.resumeParentOnCompletion,
322+
depth,
323+
metadata: metadataPacket?.data,
324+
metadataType: metadataPacket?.dataType,
325+
seedMetadata: metadataPacket?.data,
326+
seedMetadataType: metadataPacket?.dataType,
327+
maxDurationInSeconds: body.options?.maxDuration
328+
? clampMaxDuration(body.options.maxDuration)
329+
: undefined,
330+
machine: body.options?.machine,
331+
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
332+
queueTimestamp:
333+
options.queueTimestamp ??
334+
(parentRun && body.options?.resumeParentOnCompletion
335+
? parentRun.queueTimestamp ?? undefined
336+
: undefined),
337+
scheduleId: options.scheduleId,
338+
scheduleInstanceId: options.scheduleInstanceId,
339+
createdAt: options.overrideCreatedAt,
340+
bulkActionId: body.options?.bulkActionId,
341+
planType,
342+
realtimeStreamsVersion: options.realtimeStreamsVersion,
343+
},
344+
this.prisma
363345
);
364346

347+
const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined;
348+
349+
if (error) {
350+
event.failWithError(error);
351+
}
352+
353+
const result = { run: taskRun, error, isCached: false };
354+
365355
if (result?.error) {
366356
throw new ServiceValidationError(
367357
taskRunErrorToString(taskRunErrorEnhancer(result.error))

apps/webapp/app/runEngine/types.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,6 @@ export interface PayloadProcessor {
7676
process(request: TriggerTaskRequest): Promise<IOPacket>;
7777
}
7878

79-
export interface RunNumberIncrementer {
80-
incrementRunNumber<T>(
81-
request: TriggerTaskRequest,
82-
callback: (num: number) => Promise<T>
83-
): Promise<T | undefined>;
84-
}
85-
8679
export interface TagValidationParams {
8780
tags?: string[] | string;
8881
}

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { env } from "~/env.server";
44
import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server";
55
import { DefaultPayloadProcessor } from "~/runEngine/concerns/payloads.server";
66
import { DefaultQueueManager } from "~/runEngine/concerns/queues.server";
7-
import { DefaultRunNumberIncrementer } from "~/runEngine/concerns/runNumbers.server";
87
import { DefaultTraceEventsConcern } from "~/runEngine/concerns/traceEvents.server";
98
import { RunEngineTriggerTaskService } from "~/runEngine/services/triggerTask.server";
109
import { DefaultTriggerTaskValidator } from "~/runEngine/validators/triggerTaskValidator";
@@ -106,7 +105,6 @@ export class TriggerTaskService extends WithRunEngine {
106105
this._engine,
107106
traceEventConcern
108107
),
109-
runNumberIncrementer: new DefaultRunNumberIncrementer(),
110108
traceEventConcern,
111109
tracer: tracer,
112110
metadataMaximumSize: env.TASK_RUN_METADATA_MAXIMUM_SIZE,

internal-packages/run-engine/src/engine/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,6 @@ export class RunEngine {
384384
async trigger(
385385
{
386386
friendlyId,
387-
number,
388387
environment,
389388
idempotencyKey,
390389
idempotencyKeyExpiresAt,
@@ -457,7 +456,6 @@ export class RunEngine {
457456
id: taskRunId,
458457
engine: "V2",
459458
status,
460-
number,
461459
friendlyId,
462460
runtimeEnvironmentId: environment.id,
463461
environmentType: environment.type,
@@ -1318,6 +1316,9 @@ export class RunEngine {
13181316

13191317
// This is just a failsafe
13201318
await this.runLockRedis.quit();
1319+
1320+
// Close the batch queue and its Redis connections
1321+
await this.batchQueue.close();
13211322
} catch (error) {
13221323
// And should always throw
13231324
}

internal-packages/run-engine/src/engine/systems/batchSystem.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ export class BatchSystem {
6565

6666
// Check if all runs are created (or accounted for with failures)
6767
// v2 batches use successfulRunCount + failedRunCount, v1 uses processingJobsCount
68-
const isV2Batch = batch.batchVersion === "runengine:v2";
69-
68+
const isNewBatch = batch.batchVersion === "runengine:v2";
69+
7070
let processedRunCount: number;
71-
if (isV2Batch) {
72-
// For v2 batches, we need to count both successful and failed runs
71+
if (isNewBatch) {
72+
// For v2/v3 batches, we need to count both successful and failed runs
7373
const successfulCount = batch.successfulRunCount ?? 0;
7474
const failedCount = batch.failedRunCount ?? 0;
7575
processedRunCount = successfulCount + failedCount;
@@ -82,7 +82,7 @@ export class BatchSystem {
8282
batchId,
8383
processedRunCount,
8484
runCount: batch.runCount,
85-
isV2Batch,
85+
isNewBatch,
8686
});
8787
return;
8888
}

0 commit comments

Comments
 (0)