diff --git a/dd-java-agent/instrumentation/spark/spark-common/build.gradle b/dd-java-agent/instrumentation/spark/spark-common/build.gradle index 84dd5cca6a5..f19ebd38b4a 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/build.gradle +++ b/dd-java-agent/instrumentation/spark/spark-common/build.gradle @@ -10,6 +10,7 @@ configurations.configureEach { dependencies { compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0' compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0' + compileOnly group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0' testFixturesImplementation group: 'com.datadoghq', name: 'sketches-java', version: '0.8.2' testFixturesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0' @@ -21,7 +22,12 @@ dependencies { testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0' testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0' testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '2.4.0' + testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0' testFixturesCompileOnly(libs.bundles.groovy) testFixturesCompileOnly(libs.bundles.spock) + + testImplementation project(':dd-java-agent:instrumentation-testing') + testImplementation group: 'org.apache.spark', name: 'spark-launcher_2.12', version: '2.4.0' } + diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index 80c71a4f64b..ea2c1080a06 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -106,6 +106,8 @@ public static void exit(@Advice.Thrown Throwable throwable) { if (AbstractDatadogSparkListener.listener != null) { AbstractDatadogSparkListener.listener.finishApplication( System.currentTimeMillis(), throwable, 0, null); + } else { + SparkLauncherListener.finishSpanWithThrowable(throwable); } } } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java index 08ed700047a..23c13688306 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkConfAllowList.java @@ -17,6 +17,12 @@ * @see Spark Configuration */ class SparkConfAllowList { + // Using values from + // https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1150-L1158 + static final String DEFAULT_REDACTION_REGEX = "(?i)secret|password|token|access.key|api.key"; + + private static final Pattern DEFAULT_REDACTION_PATTERN = Pattern.compile(DEFAULT_REDACTION_REGEX); + /** * Job-specific parameters that can be used to control job execution or provide metadata about the * job being executed @@ -80,11 +86,17 @@ public static boolean canCaptureJobParameter(String parameterName) { return allowedJobParams.contains(parameterName); } + /** Redact a value if the key or value matches the default redaction pattern. */ + public static String redactValue(String key, String value) { + if (DEFAULT_REDACTION_PATTERN.matcher(key).find() + || DEFAULT_REDACTION_PATTERN.matcher(value).find()) { + return "[redacted]"; + } + return value; + } + public static List> getRedactedSparkConf(SparkConf conf) { - // Using values from - // https://github.com/apache/spark/blob/v3.5.1/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1150-L1158 - String redactionPattern = - conf.get("spark.redaction.regex", "(?i)secret|password|token|access.key|api.key"); + String redactionPattern = conf.get("spark.redaction.regex", DEFAULT_REDACTION_REGEX); List> redacted = new ArrayList<>(); Pattern pattern = Pattern.compile(redactionPattern); diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java new file mode 100644 index 00000000000..3e1b2ffd93c --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java @@ -0,0 +1,67 @@ +package datadog.trace.instrumentation.spark; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.InstrumenterConfig; +import net.bytebuddy.asm.Advice; +import org.apache.spark.launcher.SparkAppHandle; + +@AutoService(InstrumenterModule.class) +public class SparkLauncherInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public SparkLauncherInstrumentation() { + super("spark-launcher"); + } + + @Override + protected boolean defaultEnabled() { + return InstrumenterConfig.get().isDataJobsEnabled(); + } + + @Override + public String instrumentedType() { + return "org.apache.spark.launcher.SparkLauncher"; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".SparkConfAllowList", + packageName + ".SparkLauncherListener", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(named("startApplication")) + .and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))), + SparkLauncherInstrumentation.class.getName() + "$StartApplicationAdvice"); + } + + public static class StartApplicationAdvice { + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void exit( + @Advice.This Object launcher, + @Advice.Return SparkAppHandle handle, + @Advice.Thrown Throwable throwable) { + SparkLauncherListener.createLauncherSpan(launcher); + + if (throwable != null) { + SparkLauncherListener.finishSpanWithThrowable(throwable); + return; + } + + if (handle != null) { + handle.addListener(new SparkLauncherListener()); + } + } + } +} diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java new file mode 100644 index 00000000000..1b784bb6bba --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java @@ -0,0 +1,177 @@ +package datadog.trace.instrumentation.spark; + +import datadog.trace.api.DDTags; +import datadog.trace.api.sampling.PrioritySampling; +import datadog.trace.api.sampling.SamplingMechanism; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import java.lang.reflect.Field; +import java.util.Map; +import org.apache.spark.launcher.SparkAppHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SparkLauncherListener implements SparkAppHandle.Listener { + + private static final Logger log = LoggerFactory.getLogger(SparkLauncherListener.class); + + static volatile AgentSpan launcherSpan; + + private static volatile boolean shutdownHookRegistered = false; + + public static synchronized void createLauncherSpan(Object launcher) { + if (launcherSpan != null) { + return; + } + + AgentTracer.TracerAPI tracer = AgentTracer.get(); + AgentSpan span = + tracer + .buildSpan("spark.launcher.launch") + .withSpanType("spark") + .withResourceName("SparkLauncher.startApplication") + .start(); + span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS); + setLauncherConfigTags(span, launcher); + launcherSpan = span; + + + if (!shutdownHookRegistered) { + shutdownHookRegistered = true; + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + synchronized (SparkLauncherListener.class) { + AgentSpan s = launcherSpan; + if (s != null) { + log.info("Finishing spark.launcher span from shutdown hook"); + s.finish(); + launcherSpan = null; + } + } + })); + } + } + + public static synchronized void finishSpan(boolean isError, String errorMessage) { + AgentSpan span = launcherSpan; + if (span == null) { + return; + } + if (isError) { + span.setError(true); + span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed"); + span.setTag(DDTags.ERROR_MSG, errorMessage); + } + span.finish(); + launcherSpan = null; + } + + public static synchronized void finishSpanWithThrowable(Throwable throwable) { + AgentSpan span = launcherSpan; + if (span == null) { + return; + } + if (throwable != null) { + span.addThrowable(throwable); + } + span.finish(); + launcherSpan = null; + } + + @Override + public void stateChanged(SparkAppHandle handle) { + SparkAppHandle.State state = handle.getState(); + AgentSpan span = launcherSpan; + if (span != null) { + span.setTag("spark.launcher.app_state", state.toString()); + + String appId = handle.getAppId(); + if (appId != null) { + span.setTag("spark.app_id", appId); + span.setTag("app_id", appId); + } + + if (state.isFinal()) { + if (state == SparkAppHandle.State.FAILED + || state == SparkAppHandle.State.KILLED + || state == SparkAppHandle.State.LOST) { + // Set error tags but don't finish yet — RunMainAdvice may add the throwable + // with the full stack trace. The span will be finished by RunMainAdvice or + // the shutdown hook. + span.setError(true); + span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed"); + span.setTag(DDTags.ERROR_MSG, "Application " + state); + } else { + finishSpan(false, null); + } + } + } + } + + @Override + public void infoChanged(SparkAppHandle handle) { + AgentSpan span = launcherSpan; + if (span != null) { + String appId = handle.getAppId(); + if (appId != null) { + span.setTag("spark.app_id", appId); + span.setTag("app_id", appId); + } + } + } + + private static void setLauncherConfigTags(AgentSpan span, Object launcher) { + try { + Field builderField = launcher.getClass().getSuperclass().getDeclaredField("builder"); + builderField.setAccessible(true); + Object builder = builderField.get(launcher); + if (builder == null) { + return; + } + + Class builderClass = builder.getClass(); + Class abstractBuilderClass = builderClass.getSuperclass(); + + setStringFieldAsTag(span, builder, abstractBuilderClass, "master", "master"); + setStringFieldAsTag(span, builder, abstractBuilderClass, "deployMode", "deploy_mode"); + setStringFieldAsTag(span, builder, abstractBuilderClass, "appName", "application_name"); + setStringFieldAsTag(span, builder, abstractBuilderClass, "mainClass", "main_class"); + setStringFieldAsTag(span, builder, abstractBuilderClass, "appResource", "app_resource"); + + try { + Field confField = abstractBuilderClass.getDeclaredField("conf"); + confField.setAccessible(true); + @SuppressWarnings("unchecked") + Map conf = (Map) confField.get(builder); + if (conf != null) { + for (Map.Entry entry : conf.entrySet()) { + if (SparkConfAllowList.canCaptureJobParameter(entry.getKey())) { + String value = SparkConfAllowList.redactValue(entry.getKey(), entry.getValue()); + span.setTag("config." + entry.getKey().replace('.', '_'), value); + } + } + } + } catch (NoSuchFieldException e) { + log.debug("Could not find conf field on builder", e); + } + } catch (Exception e) { + log.debug("Failed to extract SparkLauncher configuration", e); + } + } + + private static void setStringFieldAsTag( + AgentSpan span, Object obj, Class clazz, String fieldName, String tagName) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + Object value = field.get(obj); + if (value != null) { + span.setTag(tagName, value.toString()); + } + } catch (Exception e) { + log.debug("Could not read field {} from builder", fieldName, e); + } + } +} diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy new file mode 100644 index 00000000000..f7a5b5a9601 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy @@ -0,0 +1,203 @@ +package datadog.trace.instrumentation.spark + +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.api.sampling.PrioritySampling +import datadog.trace.api.sampling.SamplingMechanism +import datadog.trace.bootstrap.instrumentation.api.AgentTracer +import org.apache.spark.launcher.SparkAppHandle +import org.apache.spark.launcher.SparkLauncher + +class SparkLauncherTest extends InstrumentationSpecification { + + def "StartApplicationAdvice extracts config tags from SparkLauncher"() { + setup: + SparkLauncherListener.launcherSpan = null + + when: + def launcher = new SparkLauncher() + .setMaster("yarn") + .setDeployMode("cluster") + .setAppName("test-app") + .setMainClass("com.example.Main") + .setAppResource("/path/to/app.jar") + .setConf("spark.executor.memory", "4g") + .setConf("spark.executor.instances", "10") + SparkLauncherListener.createLauncherSpan(launcher) + SparkLauncherListener.finishSpan(false, null) + + then: + assertTraces(1) { + trace(1) { + span { + operationName "spark.launcher.launch" + spanType "spark" + resourceName "SparkLauncher.startApplication" + errored false + assert span.tags["master"] == "yarn" + assert span.tags["deploy_mode"] == "cluster" + assert span.tags["application_name"] == "test-app" + assert span.tags["main_class"] == "com.example.Main" + assert span.tags["app_resource"] == "/path/to/app.jar" + assert span.tags["config.spark_executor_memory"] == "4g" + assert span.tags["config.spark_executor_instances"] == "10" + } + } + } + } + + def "StartApplicationAdvice redacts sensitive conf values"() { + setup: + SparkLauncherListener.launcherSpan = null + + when: + def launcher = new SparkLauncher() + // spark.app.name is allowlisted; its value contains "secret" so should be redacted + .setConf("spark.app.name", "my-secret-app") + // spark.master is allowlisted; its value is harmless so should pass through + .setConf("spark.master", "yarn") + SparkLauncherListener.createLauncherSpan(launcher) + SparkLauncherListener.finishSpan(false, null) + + then: + assertTraces(1) { + trace(1) { + span { + operationName "spark.launcher.launch" + spanType "spark" + resourceName "SparkLauncher.startApplication" + errored false + assert span.tags["config.spark_app_name"] == "[redacted]" + assert span.tags["config.spark_master"] == "yarn" + } + } + } + } + + def "finishSpanWithThrowable finishes span with error"() { + setup: + SparkLauncherListener.launcherSpan = null + + when: + def launcher = new SparkLauncher().setAppName("test-app") + SparkLauncherListener.createLauncherSpan(launcher) + SparkLauncherListener.finishSpanWithThrowable(new RuntimeException("startApplication failed")) + + then: + SparkLauncherListener.launcherSpan == null + assertTraces(1) { + trace(1) { + span { + operationName "spark.launcher.launch" + spanType "spark" + errored true + } + } + } + } + + def "SparkLauncherListener finishes span on final state FINISHED"() { + setup: + SparkLauncherListener.launcherSpan = null + def tracer = AgentTracer.get() + SparkLauncherListener.launcherSpan = tracer + .buildSpan("spark.launcher.launch") + .withSpanType("spark") + .withResourceName("SparkLauncher.startApplication") + .start() + SparkLauncherListener.launcherSpan.setSamplingPriority( + PrioritySampling.USER_KEEP, + SamplingMechanism.DATA_JOBS) + def listener = new SparkLauncherListener() + def handle = Mock(SparkAppHandle) + + when: + handle.getState() >> SparkAppHandle.State.FINISHED + handle.getAppId() >> "app-123" + listener.stateChanged(handle) + + then: + SparkLauncherListener.launcherSpan == null + assertTraces(1) { + trace(1) { + span { + operationName "spark.launcher.launch" + spanType "spark" + errored false + assert span.tags["spark.app_id"] == "app-123" + assert span.tags["spark.launcher.app_state"] == "FINISHED" + } + } + } + } + + def "SparkLauncherListener sets error tags on FAILED state but does not finish span"() { + setup: + SparkLauncherListener.launcherSpan = null + def tracer = AgentTracer.get() + SparkLauncherListener.launcherSpan = tracer + .buildSpan("spark.launcher.launch") + .withSpanType("spark") + .withResourceName("SparkLauncher.startApplication") + .start() + SparkLauncherListener.launcherSpan.setSamplingPriority( + PrioritySampling.USER_KEEP, + SamplingMechanism.DATA_JOBS) + def listener = new SparkLauncherListener() + def handle = Mock(SparkAppHandle) + + when: + handle.getState() >> SparkAppHandle.State.FAILED + handle.getAppId() >> "app-456" + listener.stateChanged(handle) + + then: + // Span stays open so RunMainAdvice can add the throwable + SparkLauncherListener.launcherSpan != null + SparkLauncherListener.launcherSpan.isError() + SparkLauncherListener.launcherSpan.getTags()["error.type"] == "Spark Launcher Failed" + SparkLauncherListener.launcherSpan.getTags()["error.message"] == "Application FAILED" + SparkLauncherListener.launcherSpan.getTags()["spark.app_id"] == "app-456" + + cleanup: + SparkLauncherListener.finishSpan(false, null) + } + + def "finishSpanWithThrowable adds stack trace after FAILED state"() { + setup: + SparkLauncherListener.launcherSpan = null + def tracer = AgentTracer.get() + SparkLauncherListener.launcherSpan = tracer + .buildSpan("spark.launcher.launch") + .withSpanType("spark") + .withResourceName("SparkLauncher.startApplication") + .start() + SparkLauncherListener.launcherSpan.setSamplingPriority( + PrioritySampling.USER_KEEP, + SamplingMechanism.DATA_JOBS) + def listener = new SparkLauncherListener() + def handle = Mock(SparkAppHandle) + + when: + // Simulate: listener sets error tags, then RunMainAdvice finishes with throwable + handle.getState() >> SparkAppHandle.State.FAILED + handle.getAppId() >> "app-456" + listener.stateChanged(handle) + SparkLauncherListener.finishSpanWithThrowable(new RuntimeException("job crashed")) + + then: + SparkLauncherListener.launcherSpan == null + assertTraces(1) { + trace(1) { + span { + operationName "spark.launcher.launch" + spanType "spark" + errored true + assert span.tags["error.type"] == "java.lang.RuntimeException" + assert span.tags["error.message"] == "job crashed" + assert span.tags["error.stack"] != null + assert span.tags["spark.app_id"] == "app-456" + } + } + } + } +} diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java index 4cd23089cd1..7b00f04b30c 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java @@ -32,6 +32,7 @@ public String[] helperClassNames() { packageName + ".RemoveEldestHashMap", packageName + ".SparkAggregatedTaskMetrics", packageName + ".SparkConfAllowList", + packageName + ".SparkLauncherListener", packageName + ".SparkSQLUtils", packageName + ".SparkSQLUtils$SparkPlanInfoForStage", packageName + ".SparkSQLUtils$AccumulatorWithStage", diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java index c9e534f6429..eda0436f464 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java @@ -32,6 +32,7 @@ public String[] helperClassNames() { packageName + ".RemoveEldestHashMap", packageName + ".SparkAggregatedTaskMetrics", packageName + ".SparkConfAllowList", + packageName + ".SparkLauncherListener", packageName + ".SparkSQLUtils", packageName + ".SparkSQLUtils$SparkPlanInfoForStage", packageName + ".SparkSQLUtils$AccumulatorWithStage",