diff --git a/dd-java-agent/instrumentation/spark/spark-common/build.gradle b/dd-java-agent/instrumentation/spark/spark-common/build.gradle
index 84dd5cca6a5..f19ebd38b4a 100644
--- a/dd-java-agent/instrumentation/spark/spark-common/build.gradle
+++ b/dd-java-agent/instrumentation/spark/spark-common/build.gradle
@@ -10,6 +10,7 @@ configurations.configureEach {
dependencies {
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
+ compileOnly group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0'
testFixturesImplementation group: 'com.datadoghq', name: 'sketches-java', version: '0.8.2'
testFixturesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0'
@@ -21,7 +22,12 @@ dependencies {
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '2.4.0'
+ testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0'
testFixturesCompileOnly(libs.bundles.groovy)
testFixturesCompileOnly(libs.bundles.spock)
+
+ testImplementation project(':dd-java-agent:instrumentation-testing')
+ testImplementation group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0'
}
+
diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java
index 80c71a4f64b..ea2c1080a06 100644
--- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java
+++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java
@@ -106,6 +106,8 @@ public static void exit(@Advice.Thrown Throwable throwable) {
if (AbstractDatadogSparkListener.listener != null) {
AbstractDatadogSparkListener.listener.finishApplication(
System.currentTimeMillis(), throwable, 0, null);
+ } else {
+ SparkLauncherListener.finishSpanWithThrowable(throwable);
}
}
}
diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java
index 08ed700047a..23c13688306 100644
--- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java
+++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java
@@ -17,6 +17,12 @@
* @see Spark Configuration
*/
class SparkConfAllowList {
+ // Using values from
+ // https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1150-L1158
+ static final String DEFAULT_REDACTION_REGEX = "(?i)secret|password|token|access.key|api.key";
+
+ private static final Pattern DEFAULT_REDACTION_PATTERN = Pattern.compile(DEFAULT_REDACTION_REGEX);
+
/**
* Job-specific parameters that can be used to control job execution or provide metadata about the
* job being executed
@@ -80,11 +86,17 @@ public static boolean canCaptureJobParameter(String parameterName) {
return allowedJobParams.contains(parameterName);
}
+ /** Redact a value if the key or value matches the default redaction pattern. */
+ public static String redactValue(String key, String value) {
+ if (DEFAULT_REDACTION_PATTERN.matcher(key).find()
+ || DEFAULT_REDACTION_PATTERN.matcher(value).find()) {
+ return "[redacted]";
+ }
+ return value;
+ }
+
public static List> getRedactedSparkConf(SparkConf conf) {
- // Using values from
- // https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1150-L1158
- String redactionPattern =
- conf.get("spark.redaction.regex", "(?i)secret|password|token|access.key|api.key");
+ String redactionPattern = conf.get("spark.redaction.regex", DEFAULT_REDACTION_REGEX);
List> redacted = new ArrayList<>();
Pattern pattern = Pattern.compile(redactionPattern);
diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java
new file mode 100644
index 00000000000..3e1b2ffd93c
--- /dev/null
+++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java
@@ -0,0 +1,67 @@
+package datadog.trace.instrumentation.spark;
+
+import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
+import static net.bytebuddy.matcher.ElementMatchers.isMethod;
+
+import com.google.auto.service.AutoService;
+import datadog.trace.agent.tooling.Instrumenter;
+import datadog.trace.agent.tooling.InstrumenterModule;
+import datadog.trace.api.InstrumenterConfig;
+import net.bytebuddy.asm.Advice;
+import org.apache.spark.launcher.SparkAppHandle;
+
+@AutoService(InstrumenterModule.class)
+public class SparkLauncherInstrumentation extends InstrumenterModule.Tracing
+ implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
+
+ public SparkLauncherInstrumentation() {
+ super("spark-launcher");
+ }
+
+ @Override
+ protected boolean defaultEnabled() {
+ return InstrumenterConfig.get().isDataJobsEnabled();
+ }
+
+ @Override
+ public String instrumentedType() {
+ return "org.apache.spark.launcher.SparkLauncher";
+ }
+
+ @Override
+ public String[] helperClassNames() {
+ return new String[] {
+ packageName + ".SparkConfAllowList",
+ packageName + ".SparkLauncherListener",
+ };
+ }
+
+ @Override
+ public void methodAdvice(MethodTransformer transformer) {
+ transformer.applyAdvice(
+ isMethod()
+ .and(named("startApplication"))
+ .and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))),
+ SparkLauncherInstrumentation.class.getName() + "$StartApplicationAdvice");
+ }
+
+ public static class StartApplicationAdvice {
+ @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
+ public static void exit(
+ @Advice.This Object launcher,
+ @Advice.Return SparkAppHandle handle,
+ @Advice.Thrown Throwable throwable) {
+ SparkLauncherListener.createLauncherSpan(launcher);
+
+ if (throwable != null) {
+ SparkLauncherListener.finishSpanWithThrowable(throwable);
+ return;
+ }
+
+ if (handle != null) {
+ handle.addListener(new SparkLauncherListener());
+ }
+ }
+ }
+}
diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java
new file mode 100644
index 00000000000..1b784bb6bba
--- /dev/null
+++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java
@@ -0,0 +1,177 @@
+package datadog.trace.instrumentation.spark;
+
+import datadog.trace.api.DDTags;
+import datadog.trace.api.sampling.PrioritySampling;
+import datadog.trace.api.sampling.SamplingMechanism;
+import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
+import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
+import java.lang.reflect.Field;
+import java.util.Map;
+import org.apache.spark.launcher.SparkAppHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparkLauncherListener implements SparkAppHandle.Listener {
+
+ private static final Logger log = LoggerFactory.getLogger(SparkLauncherListener.class);
+
+ static volatile AgentSpan launcherSpan;
+
+ private static volatile boolean shutdownHookRegistered = false;
+
+ public static synchronized void createLauncherSpan(Object launcher) {
+ if (launcherSpan != null) {
+ return;
+ }
+
+ AgentTracer.TracerAPI tracer = AgentTracer.get();
+ AgentSpan span =
+ tracer
+ .buildSpan("spark.launcher.launch")
+ .withSpanType("spark")
+ .withResourceName("SparkLauncher.startApplication")
+ .start();
+ span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS);
+ setLauncherConfigTags(span, launcher);
+ launcherSpan = span;
+
+
+ if (!shutdownHookRegistered) {
+ shutdownHookRegistered = true;
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ synchronized (SparkLauncherListener.class) {
+ AgentSpan s = launcherSpan;
+ if (s != null) {
+ log.info("Finishing spark.launcher span from shutdown hook");
+ s.finish();
+ launcherSpan = null;
+ }
+ }
+ }));
+ }
+ }
+
+ public static synchronized void finishSpan(boolean isError, String errorMessage) {
+ AgentSpan span = launcherSpan;
+ if (span == null) {
+ return;
+ }
+ if (isError) {
+ span.setError(true);
+ span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed");
+ span.setTag(DDTags.ERROR_MSG, errorMessage);
+ }
+ span.finish();
+ launcherSpan = null;
+ }
+
+ public static synchronized void finishSpanWithThrowable(Throwable throwable) {
+ AgentSpan span = launcherSpan;
+ if (span == null) {
+ return;
+ }
+ if (throwable != null) {
+ span.addThrowable(throwable);
+ }
+ span.finish();
+ launcherSpan = null;
+ }
+
+ @Override
+ public void stateChanged(SparkAppHandle handle) {
+ SparkAppHandle.State state = handle.getState();
+ AgentSpan span = launcherSpan;
+ if (span != null) {
+ span.setTag("spark.launcher.app_state", state.toString());
+
+ String appId = handle.getAppId();
+ if (appId != null) {
+ span.setTag("spark.app_id", appId);
+ span.setTag("app_id", appId);
+ }
+
+ if (state.isFinal()) {
+ if (state == SparkAppHandle.State.FAILED
+ || state == SparkAppHandle.State.KILLED
+ || state == SparkAppHandle.State.LOST) {
+ // Set error tags but don't finish yet — RunMainAdvice may add the throwable
+ // with the full stack trace. The span will be finished by RunMainAdvice or
+ // the shutdown hook.
+ span.setError(true);
+ span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed");
+ span.setTag(DDTags.ERROR_MSG, "Application " + state);
+ } else {
+ finishSpan(false, null);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void infoChanged(SparkAppHandle handle) {
+ AgentSpan span = launcherSpan;
+ if (span != null) {
+ String appId = handle.getAppId();
+ if (appId != null) {
+ span.setTag("spark.app_id", appId);
+ span.setTag("app_id", appId);
+ }
+ }
+ }
+
+ private static void setLauncherConfigTags(AgentSpan span, Object launcher) {
+ try {
+ Field builderField = launcher.getClass().getSuperclass().getDeclaredField("builder");
+ builderField.setAccessible(true);
+ Object builder = builderField.get(launcher);
+ if (builder == null) {
+ return;
+ }
+
+ Class> builderClass = builder.getClass();
+ Class> abstractBuilderClass = builderClass.getSuperclass();
+
+ setStringFieldAsTag(span, builder, abstractBuilderClass, "master", "master");
+ setStringFieldAsTag(span, builder, abstractBuilderClass, "deployMode", "deploy_mode");
+ setStringFieldAsTag(span, builder, abstractBuilderClass, "appName", "application_name");
+ setStringFieldAsTag(span, builder, abstractBuilderClass, "mainClass", "main_class");
+ setStringFieldAsTag(span, builder, abstractBuilderClass, "appResource", "app_resource");
+
+ try {
+ Field confField = abstractBuilderClass.getDeclaredField("conf");
+ confField.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Map conf = (Map) confField.get(builder);
+ if (conf != null) {
+ for (Map.Entry entry : conf.entrySet()) {
+ if (SparkConfAllowList.canCaptureJobParameter(entry.getKey())) {
+ String value = SparkConfAllowList.redactValue(entry.getKey(), entry.getValue());
+ span.setTag("config." + entry.getKey().replace('.', '_'), value);
+ }
+ }
+ }
+ } catch (NoSuchFieldException e) {
+ log.debug("Could not find conf field on builder", e);
+ }
+ } catch (Exception e) {
+ log.debug("Failed to extract SparkLauncher configuration", e);
+ }
+ }
+
+ private static void setStringFieldAsTag(
+ AgentSpan span, Object obj, Class> clazz, String fieldName, String tagName) {
+ try {
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ Object value = field.get(obj);
+ if (value != null) {
+ span.setTag(tagName, value.toString());
+ }
+ } catch (Exception e) {
+ log.debug("Could not read field {} from builder", fieldName, e);
+ }
+ }
+}
diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy
new file mode 100644
index 00000000000..f7a5b5a9601
--- /dev/null
+++ b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy
@@ -0,0 +1,203 @@
+package datadog.trace.instrumentation.spark
+
+import datadog.trace.agent.test.InstrumentationSpecification
+import datadog.trace.api.sampling.PrioritySampling
+import datadog.trace.api.sampling.SamplingMechanism
+import datadog.trace.bootstrap.instrumentation.api.AgentTracer
+import org.apache.spark.launcher.SparkAppHandle
+import org.apache.spark.launcher.SparkLauncher
+
+class SparkLauncherTest extends InstrumentationSpecification {
+
+ def "StartApplicationAdvice extracts config tags from SparkLauncher"() {
+ setup:
+ SparkLauncherListener.launcherSpan = null
+
+ when:
+ def launcher = new SparkLauncher()
+ .setMaster("yarn")
+ .setDeployMode("cluster")
+ .setAppName("test-app")
+ .setMainClass("com.example.Main")
+ .setAppResource("/path/to/app.jar")
+ .setConf("spark.executor.memory", "4g")
+ .setConf("spark.executor.instances", "10")
+ SparkLauncherListener.createLauncherSpan(launcher)
+ SparkLauncherListener.finishSpan(false, null)
+
+ then:
+ assertTraces(1) {
+ trace(1) {
+ span {
+ operationName "spark.launcher.launch"
+ spanType "spark"
+ resourceName "SparkLauncher.startApplication"
+ errored false
+ assert span.tags["master"] == "yarn"
+ assert span.tags["deploy_mode"] == "cluster"
+ assert span.tags["application_name"] == "test-app"
+ assert span.tags["main_class"] == "com.example.Main"
+ assert span.tags["app_resource"] == "/path/to/app.jar"
+ assert span.tags["config.spark_executor_memory"] == "4g"
+ assert span.tags["config.spark_executor_instances"] == "10"
+ }
+ }
+ }
+ }
+
+ def "StartApplicationAdvice redacts sensitive conf values"() {
+ setup:
+ SparkLauncherListener.launcherSpan = null
+
+ when:
+ def launcher = new SparkLauncher()
+ // spark.app.name is allowlisted; its value contains "secret" so should be redacted
+ .setConf("spark.app.name", "my-secret-app")
+ // spark.master is allowlisted; its value is harmless so should pass through
+ .setConf("spark.master", "yarn")
+ SparkLauncherListener.createLauncherSpan(launcher)
+ SparkLauncherListener.finishSpan(false, null)
+
+ then:
+ assertTraces(1) {
+ trace(1) {
+ span {
+ operationName "spark.launcher.launch"
+ spanType "spark"
+ resourceName "SparkLauncher.startApplication"
+ errored false
+ assert span.tags["config.spark_app_name"] == "[redacted]"
+ assert span.tags["config.spark_master"] == "yarn"
+ }
+ }
+ }
+ }
+
+ def "finishSpanWithThrowable finishes span with error"() {
+ setup:
+ SparkLauncherListener.launcherSpan = null
+
+ when:
+ def launcher = new SparkLauncher().setAppName("test-app")
+ SparkLauncherListener.createLauncherSpan(launcher)
+ SparkLauncherListener.finishSpanWithThrowable(new RuntimeException("startApplication failed"))
+
+ then:
+ SparkLauncherListener.launcherSpan == null
+ assertTraces(1) {
+ trace(1) {
+ span {
+ operationName "spark.launcher.launch"
+ spanType "spark"
+ errored true
+ }
+ }
+ }
+ }
+
+ def "SparkLauncherListener finishes span on final state FINISHED"() {
+ setup:
+ SparkLauncherListener.launcherSpan = null
+ def tracer = AgentTracer.get()
+ SparkLauncherListener.launcherSpan = tracer
+ .buildSpan("spark.launcher.launch")
+ .withSpanType("spark")
+ .withResourceName("SparkLauncher.startApplication")
+ .start()
+ SparkLauncherListener.launcherSpan.setSamplingPriority(
+ PrioritySampling.USER_KEEP,
+ SamplingMechanism.DATA_JOBS)
+ def listener = new SparkLauncherListener()
+ def handle = Mock(SparkAppHandle)
+
+ when:
+ handle.getState() >> SparkAppHandle.State.FINISHED
+ handle.getAppId() >> "app-123"
+ listener.stateChanged(handle)
+
+ then:
+ SparkLauncherListener.launcherSpan == null
+ assertTraces(1) {
+ trace(1) {
+ span {
+ operationName "spark.launcher.launch"
+ spanType "spark"
+ errored false
+ assert span.tags["spark.app_id"] == "app-123"
+ assert span.tags["spark.launcher.app_state"] == "FINISHED"
+ }
+ }
+ }
+ }
+
+ def "SparkLauncherListener sets error tags on FAILED state but does not finish span"() {
+ setup:
+ SparkLauncherListener.launcherSpan = null
+ def tracer = AgentTracer.get()
+ SparkLauncherListener.launcherSpan = tracer
+ .buildSpan("spark.launcher.launch")
+ .withSpanType("spark")
+ .withResourceName("SparkLauncher.startApplication")
+ .start()
+ SparkLauncherListener.launcherSpan.setSamplingPriority(
+ PrioritySampling.USER_KEEP,
+ SamplingMechanism.DATA_JOBS)
+ def listener = new SparkLauncherListener()
+ def handle = Mock(SparkAppHandle)
+
+ when:
+ handle.getState() >> SparkAppHandle.State.FAILED
+ handle.getAppId() >> "app-456"
+ listener.stateChanged(handle)
+
+ then:
+ // Span stays open so RunMainAdvice can add the throwable
+ SparkLauncherListener.launcherSpan != null
+ SparkLauncherListener.launcherSpan.isError()
+ SparkLauncherListener.launcherSpan.getTags()["error.type"] == "Spark Launcher Failed"
+ SparkLauncherListener.launcherSpan.getTags()["error.message"] == "Application FAILED"
+ SparkLauncherListener.launcherSpan.getTags()["spark.app_id"] == "app-456"
+
+ cleanup:
+ SparkLauncherListener.finishSpan(false, null)
+ }
+
+ def "finishSpanWithThrowable adds stack trace after FAILED state"() {
+ setup:
+ SparkLauncherListener.launcherSpan = null
+ def tracer = AgentTracer.get()
+ SparkLauncherListener.launcherSpan = tracer
+ .buildSpan("spark.launcher.launch")
+ .withSpanType("spark")
+ .withResourceName("SparkLauncher.startApplication")
+ .start()
+ SparkLauncherListener.launcherSpan.setSamplingPriority(
+ PrioritySampling.USER_KEEP,
+ SamplingMechanism.DATA_JOBS)
+ def listener = new SparkLauncherListener()
+ def handle = Mock(SparkAppHandle)
+
+ when:
+ // Simulate: listener sets error tags, then RunMainAdvice finishes with throwable
+ handle.getState() >> SparkAppHandle.State.FAILED
+ handle.getAppId() >> "app-456"
+ listener.stateChanged(handle)
+ SparkLauncherListener.finishSpanWithThrowable(new RuntimeException("job crashed"))
+
+ then:
+ SparkLauncherListener.launcherSpan == null
+ assertTraces(1) {
+ trace(1) {
+ span {
+ operationName "spark.launcher.launch"
+ spanType "spark"
+ errored true
+ assert span.tags["error.type"] == "java.lang.RuntimeException"
+ assert span.tags["error.message"] == "job crashed"
+ assert span.tags["error.stack"] != null
+ assert span.tags["spark.app_id"] == "app-456"
+ }
+ }
+ }
+ }
+}
diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java
index 4cd23089cd1..7b00f04b30c 100644
--- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java
+++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java
@@ -32,6 +32,7 @@ public String[] helperClassNames() {
packageName + ".RemoveEldestHashMap",
packageName + ".SparkAggregatedTaskMetrics",
packageName + ".SparkConfAllowList",
+ packageName + ".SparkLauncherListener",
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java
index c9e534f6429..eda0436f464 100644
--- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java
+++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java
@@ -32,6 +32,7 @@ public String[] helperClassNames() {
packageName + ".RemoveEldestHashMap",
packageName + ".SparkAggregatedTaskMetrics",
packageName + ".SparkConfAllowList",
+ packageName + ".SparkLauncherListener",
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",