Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions apps/sim/executor/dag/construction/edges.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ export class EdgeConstructor {
for (const connection of workflow.connections) {
let { source, target } = connection
const originalSource = source
const originalTarget = target
let sourceHandle = this.generateSourceHandle(
source,
target,
Expand Down Expand Up @@ -257,14 +258,14 @@ export class EdgeConstructor {
target = sentinelStartId
}

if (loopSentinelStartId) {
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
}

if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
continue
}

if (loopSentinelStartId && !blocksInLoops.has(originalTarget)) {
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
}

if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) {
continue
}
Expand Down
19 changes: 10 additions & 9 deletions apps/sim/executor/execution/edge-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,16 @@ export class EdgeManager {
}
}

// Check if any deactivation targets that previously received an activated edge are now ready
for (const { target } of edgesToDeactivate) {
if (
!readyNodes.includes(target) &&
!activatedTargets.includes(target) &&
this.nodesWithActivatedEdge.has(target) &&
this.isTargetReady(target)
) {
readyNodes.push(target)
if (output.selectedRoute !== EDGE.LOOP_EXIT && output.selectedRoute !== EDGE.PARALLEL_EXIT) {
for (const { target } of edgesToDeactivate) {
if (
!readyNodes.includes(target) &&
!activatedTargets.includes(target) &&
this.nodesWithActivatedEdge.has(target) &&
this.isTargetReady(target)
) {
readyNodes.push(target)
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions apps/sim/executor/execution/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,12 @@ export class ExecutionEngine {
logger.info('Processing outgoing edges', {
nodeId,
outgoingEdgesCount: node.outgoingEdges.size,
outgoingEdges: Array.from(node.outgoingEdges.entries()).map(([id, e]) => ({
id,
target: e.target,
sourceHandle: e.sourceHandle,
})),
output,
readyNodesCount: readyNodes.length,
readyNodes,
})
Expand Down
2 changes: 2 additions & 0 deletions apps/sim/executor/execution/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ export interface ParallelScope {
items?: any[]
/** Error message if parallel validation failed (e.g., exceeded max branches) */
validationError?: string
/** Whether the parallel has an empty distribution and should be skipped */
isEmpty?: boolean
}

export class ExecutionState implements BlockStateController {
Expand Down
6 changes: 4 additions & 2 deletions apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,10 @@ export class LoopOrchestrator {
return true
}

// forEach: skip if items array is empty
if (scope.loopType === 'forEach') {
if (!scope.items || scope.items.length === 0) {
logger.info('ForEach loop has empty items, skipping loop body', { loopId })
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
return false
}
return true
Expand All @@ -399,6 +399,8 @@ export class LoopOrchestrator {
if (scope.loopType === 'for') {
if (scope.maxIterations === 0) {
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
// Set empty output for the loop
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
return false
}
return true
Expand Down
13 changes: 12 additions & 1 deletion apps/sim/executor/orchestrators/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export class NodeExecutionOrchestrator {
if (loopId) {
const shouldExecute = await this.loopOrchestrator.evaluateInitialCondition(ctx, loopId)
if (!shouldExecute) {
logger.info('While loop initial condition false, skipping loop body', { loopId })
logger.info('Loop initial condition false, skipping loop body', { loopId })
return {
sentinelStart: true,
shouldExit: true,
Expand Down Expand Up @@ -158,6 +158,17 @@ export class NodeExecutionOrchestrator {
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
}
}

const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
if (scope?.isEmpty) {
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
return {
sentinelStart: true,
shouldExit: true,
selectedRoute: EDGE.PARALLEL_EXIT,
}
}

return { sentinelStart: true }
}

Expand Down
40 changes: 36 additions & 4 deletions apps/sim/executor/orchestrators/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ export class ParallelOrchestrator {

let items: any[] | undefined
let branchCount: number
let isEmpty = false

try {
const resolved = this.resolveBranchCount(ctx, parallelConfig)
const resolved = this.resolveBranchCount(ctx, parallelConfig, parallelId)
branchCount = resolved.branchCount
items = resolved.items
isEmpty = resolved.isEmpty ?? false
} catch (error) {
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
Expand All @@ -91,6 +93,34 @@ export class ParallelOrchestrator {
throw new Error(branchError)
}

// Handle empty distribution - skip parallel body
if (isEmpty || branchCount === 0) {
const scope: ParallelScope = {
parallelId,
totalBranches: 0,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 0,
items: [],
isEmpty: true,
}

if (!ctx.parallelExecutions) {
ctx.parallelExecutions = new Map()
}
ctx.parallelExecutions.set(parallelId, scope)

// Set empty output for the parallel
this.state.setBlockOutput(parallelId, { results: [] })

logger.info('Parallel scope initialized with empty distribution, skipping body', {
parallelId,
branchCount: 0,
})

return scope
}

const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items)

const scope: ParallelScope = {
Expand Down Expand Up @@ -127,15 +157,17 @@ export class ParallelOrchestrator {

private resolveBranchCount(
ctx: ExecutionContext,
config: SerializedParallel
): { branchCount: number; items?: any[] } {
config: SerializedParallel,
parallelId: string
): { branchCount: number; items?: any[]; isEmpty?: boolean } {
if (config.parallelType === 'count') {
return { branchCount: config.count ?? 1 }
}

const items = this.resolveDistributionItems(ctx, config)
if (items.length === 0) {
return { branchCount: config.count ?? 1 }
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
return { branchCount: 0, items: [], isEmpty: true }
}

return { branchCount: items.length, items }
Expand Down