diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 5b4e1501826..b51b8c787a6 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -40,6 +40,7 @@ import org.apache.spark.ExceptionFailure; import org.apache.spark.SparkConf; import org.apache.spark.TaskFailedReason; +import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.AccumulableInfo; import org.apache.spark.scheduler.JobFailed; import org.apache.spark.scheduler.SparkListener; @@ -64,6 +65,7 @@ import org.apache.spark.sql.streaming.StateOperatorProgress; import org.apache.spark.sql.streaming.StreamingQueryListener; import org.apache.spark.sql.streaming.StreamingQueryProgress; +import org.apache.spark.util.AccumulatorV2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; @@ -127,8 +129,10 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final HashMap liveExecutors = new HashMap<>(); // There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of - // an active SQL query) - // so capping the size of the collection storing them + // an active SQL query) so capping the size of the collection storing them + // TODO (CY): Is this potentially the reason why some Spark Plans aren't showing up consistently? + // If we know we don't need the accumulator values, can we drop all associated data and just map + // stage ID -> accumulator ID? Put this behind some FF private final Map accumulators = new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE); @@ -229,6 +233,12 @@ public void setupOpenLineage(DDTraceId traceId) { /** Parent Ids of a Stage. Provide an implementation based on a specific scala version */ protected abstract int[] getStageParentIds(StageInfo info); + /** + * All External Accumulators associated with a given task. Provide an implementation based on a + * specific scala version + */ + protected abstract List getExternalAccumulators(TaskMetrics metrics); + @Override public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) { this.applicationStart = applicationStart; @@ -670,7 +680,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl SparkPlanInfo sqlPlan = sqlPlans.get(sqlExecutionId); if (sqlPlan != null) { - SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageId); + SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageMetric, stageId); } span.finish(completionTimeMs * 1000); @@ -684,7 +694,9 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { SparkAggregatedTaskMetrics stageMetric = stageMetrics.get(stageSpanKey); if (stageMetric != null) { - stageMetric.addTaskMetrics(taskEnd); + // Not happy that we have to extract external accumulators here, but needed as we're dealing + // with Seq which varies across Scala versions + stageMetric.addTaskMetrics(taskEnd, getExternalAccumulators(taskEnd.taskMetrics())); } if (taskEnd.taskMetrics() != null) { diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java index 757f20f75f5..5b0167a1da1 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java @@ -1,13 +1,20 @@ package datadog.trace.instrumentation.spark; +import com.fasterxml.jackson.core.JsonGenerator; import datadog.metrics.api.Histogram; import datadog.trace.api.Config; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.spark.TaskFailedReason; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.sql.execution.metric.SQLMetricInfo; +import org.apache.spark.util.AccumulatorV2; class SparkAggregatedTaskMetrics { private static final double HISTOGRAM_RELATIVE_ACCURACY = 1 / 32.0; @@ -59,13 +66,17 @@ class SparkAggregatedTaskMetrics { private Histogram shuffleWriteBytesHistogram; private Histogram diskBytesSpilledHistogram; + // Used for Spark SQL Plan metrics ONLY, don't put in regular span for now + private Map externalAccumulableHistograms; + public SparkAggregatedTaskMetrics() {} public SparkAggregatedTaskMetrics(long availableExecutorTime) { this.previousAvailableExecutorTime = availableExecutorTime; } - public void addTaskMetrics(SparkListenerTaskEnd taskEnd) { + public void addTaskMetrics( + SparkListenerTaskEnd taskEnd, List externalAccumulators) { taskCompletedCount += 1; if (taskEnd.taskInfo().attemptNumber() > 0) { @@ -127,6 +138,31 @@ public void addTaskMetrics(SparkListenerTaskEnd taskEnd) { shuffleWriteBytesHistogram, taskMetrics.shuffleWriteMetrics().bytesWritten()); diskBytesSpilledHistogram = lazyHistogramAccept(diskBytesSpilledHistogram, taskMetrics.diskBytesSpilled()); + + // TODO (CY): Should we also look at TaskInfo accumulable update values as a backup? Is that + // only needed for SHS? + if (externalAccumulators != null && !externalAccumulators.isEmpty()) { + if (externalAccumulableHistograms == null) { + externalAccumulableHistograms = new HashMap<>(externalAccumulators.size()); + } + + externalAccumulators.forEach( + acc -> { + Histogram hist = externalAccumulableHistograms.get(acc.id()); + if (hist == null) { + hist = + Histogram.newHistogram(HISTOGRAM_RELATIVE_ACCURACY, HISTOGRAM_MAX_NUM_BINS); + } + + try { + // As of spark 3.5, all SQL metrics are Long, safeguard if it changes in new + // versions + hist.accept((Long) acc.value()); + externalAccumulableHistograms.put(acc.id(), hist); + } catch (ClassCastException ignored) { + } + }); + } } } } @@ -276,6 +312,21 @@ private Histogram lazyHistogramAccept(Histogram hist, double value) { return hist; } + // Used to put external accum metrics to JSON for Spark SQL plans + public void externalAccumToJson(JsonGenerator generator, SQLMetricInfo info) throws IOException { + if (externalAccumulableHistograms != null) { + Histogram hist = externalAccumulableHistograms.get(info.accumulatorId()); + String name = info.name(); + + if (name != null && hist != null) { + generator.writeStartObject(); + generator.writeStringField(name, histogramToBase64(hist)); + generator.writeStringField("type", info.metricType()); + generator.writeEndObject(); + } + } + } + public static long computeTaskRunTime(TaskMetrics metrics) { return metrics.executorDeserializeTime() + metrics.executorRunTime() diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java index 33718a4b0dc..7eff461a00a 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java @@ -24,6 +24,7 @@ public static void addSQLPlanToStageSpan( AgentSpan span, SparkPlanInfo plan, Map accumulators, + SparkAggregatedTaskMetrics stageMetric, int stageId) { Set parentStageIds = new HashSet<>(); SparkPlanInfoForStage planForStage = @@ -32,7 +33,7 @@ public static void addSQLPlanToStageSpan( span.setTag("_dd.spark.sql_parent_stage_ids", parentStageIds.toString()); if (planForStage != null) { - String json = planForStage.toJson(accumulators); + String json = planForStage.toJson(stageMetric); span.setTag("_dd.spark.sql_plan", json); } } @@ -143,7 +144,7 @@ public SparkPlanInfoForStage(SparkPlanInfo plan, List chi this.children = children; } - public String toJson(Map accumulators) { + public String toJson(SparkAggregatedTaskMetrics stageMetric) { // Using the jackson JSON lib used by spark // https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0 ObjectMapper mapper = @@ -151,7 +152,7 @@ public String toJson(Map accumulators) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (JsonGenerator generator = mapper.getFactory().createGenerator(baos)) { - this.toJson(generator, accumulators, mapper); + this.toJson(generator, mapper, stageMetric); } catch (IOException e) { return null; } @@ -160,7 +161,7 @@ public String toJson(Map accumulators) { } private void toJson( - JsonGenerator generator, Map accumulators, ObjectMapper mapper) + JsonGenerator generator, ObjectMapper mapper, SparkAggregatedTaskMetrics stageMetric) throws IOException { generator.writeStartObject(); generator.writeStringField("node", plan.nodeName()); @@ -199,11 +200,7 @@ private void toJson( generator.writeFieldName("metrics"); generator.writeStartArray(); for (SQLMetricInfo metric : metrics) { - long accumulatorId = metric.accumulatorId(); - AccumulatorWithStage acc = accumulators.get(accumulatorId); - if (acc != null) { - acc.toJson(generator, metric); - } + stageMetric.externalAccumToJson(generator, metric); } generator.writeEndArray(); } @@ -213,7 +210,7 @@ private void toJson( generator.writeFieldName("children"); generator.writeStartArray(); for (SparkPlanInfoForStage child : children) { - child.toJson(generator, accumulators, mapper); + child.toJson(generator, mapper, stageMetric); } generator.writeEndArray(); } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy index a34941d1be0..569e8c440e3 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy @@ -159,12 +159,12 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { // Each metric is a dict { "metric_name": "metric_value", "type": "metric_type" } expectedMetric.each { key, expectedValue -> - assert actualMetric.containsKey(key): prefix + "metric key \"$key\" not found in $actualMetric" + assert actualMetric.containsKey(key): prefix + "metric key \"$key\" not found in $actualMetric. \n\tactual: $actual.metrics, \n\texpected: $expected.metrics" // Some metric values are duration that will varies between runs // In the case, setting the expected value to "any" skips the assertion def actualValue = actualMetric[key] - assert expectedValue == "any" || actualValue == expectedValue: prefix + "value of metric key \"$key\" does not match \"$expectedValue\", got $actualValue" + assert expectedValue == "any" || actualValue == expectedValue: prefix + "value of metric key \"$key\" does not match \"$expectedValue\", got $actualValue. \n\tactual: $actual.metrics, \n\texpected: $expected.metrics" } } } @@ -296,9 +296,17 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "number of output rows": "any", "type": "sum" }, + { + "avg hash probe (min, med, max)": "any", + "type": "average" + }, { "peak memory total (min, med, max)": "any", "type": "size" + }, + { + "spill size total (min, med, max)": "any", + "type": "size" } ], "children": [ @@ -317,7 +325,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 3, + "number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", "type": "sum" } ] @@ -367,12 +375,16 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "type": "average" }, { - "number of output rows": 2, + "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoAIQAAAAAAAPA/", "type": "sum" }, { "peak memory total (min, med, max)": "any", "type": "size" + }, + { + "spill size total (min, med, max)": "any", + "type": "size" } ], "children": [ @@ -572,6 +584,18 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { { "number of output rows": "any", "type": "sum" + }, + { + "spill size total (min, med, max)": "any", + "type": "size" + }, + { + "avg hash probe (min, med, max)": "any", + "type": "average" + }, + { + "peak memory total (min, med, max)": "any", + "type": "size" } ], "children": [ @@ -628,6 +652,10 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "_dd.unparsed" : "any" }, "metrics": [ + { + "spill size total (min, med, max)": "any", + "type": "size" + }, { "peak memory total (min, med, max)": "any", "type": "size" @@ -676,9 +704,17 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "_dd.unparsed" : "any" }, "metrics": [ + { + "spill size total (min, med, max)": "any", + "type": "size" + }, { "peak memory total (min, med, max)": "any", "type": "size" + }, + { + "sort time total (min, med, max)": "any", + "type": "timing" } ], "children": [ @@ -731,7 +767,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 1, + "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA", "type": "sum" } ], diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy index a80eb6ab1cf..bcd49144239 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy @@ -56,7 +56,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "size" }, { - "shuffle records written": 3, + "shuffle records written": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", "type": "sum" }, { @@ -93,16 +93,28 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 3, + "number of sort fallback tasks": "any", "type": "sum" }, { - "peak memory": "any", - "type": "size" + "avg hash probe bucket list iters": "any", + "type": "average" }, { "time in aggregation build": "any", "type": "timing" + }, + { + "spill size": "any", + "type": "size" + }, + { + "number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", + "type": "sum" + }, + { + "peak memory": "any", + "type": "size" } ], "children": [ @@ -116,7 +128,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 3, + "number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", "type": "sum" } ] @@ -156,21 +168,29 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "resultExpressions" : [ "string_col#0", "avg(double_col#1)#4 AS avg(double_col)#5" ] }, "metrics": [ + { + "number of sort fallback tasks": "any", + "type": "sum" + }, { "avg hash probe bucket list iters": "any", "type": "average" }, { - "number of output rows": 2, - "type": "sum" + "time in aggregation build": "any", + "type": "timing" }, { - "peak memory": "any", + "spill size": "any", "type": "size" }, { - "time in aggregation build": "any", - "type": "timing" + "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoA", + "type": "sum" + }, + { + "peak memory": "any", + "type": "size" } ], "children": [ @@ -214,7 +234,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "data size": "any", + "records read": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YIhoA", + "type": "sum" + }, + { + "remote bytes read to disk": "any", "type": "size" }, { @@ -222,28 +246,40 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "timing" }, { - "local blocks read": "any", - "type": "sum" + "shuffle bytes written": "any", + "type": "size" + }, + { + "data size": "any", + "type": "size" }, { "local bytes read": "any", "type": "size" }, { - "records read": 3, - "type": "sum" + "shuffle write time": "any", + "type": "nsTiming" }, { - "shuffle bytes written": "any", + "remote bytes read": "any", "type": "size" }, { - "shuffle records written": 3, + "local blocks read": "any", "type": "sum" }, { - "shuffle write time": "any", - "type": "nsTiming" + "remote blocks read": "any", + "type": "sum" + }, + { + "shuffle records written": "any", + "type": "sum" + }, + { + "number of partitions": "any", + "type": "sum" } ] } @@ -283,7 +319,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "size" }, { - "shuffle records written": 2, + "shuffle records written": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoA", "type": "sum" }, { @@ -319,10 +355,22 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "resultExpressions" : [ "string_col#0", "avg(double_col#1)#4 AS avg(double_col)#5" ] }, "metrics": [ + { + "number of sort fallback tasks": "any", + "type": "sum" + }, { "avg hash probe bucket list iters": "any", "type": "average" }, + { + "time in aggregation build": "any", + "type": "timing" + }, + { + "spill size": "any", + "type": "size" + }, { "number of output rows": "any", "type": "sum" @@ -330,10 +378,6 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { { "peak memory": "any", "type": "size" - }, - { - "time in aggregation build": "any", - "type": "timing" } ], "children": [ @@ -377,7 +421,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "data size": "any", + "records read": "any", + "type": "sum" + }, + { + "remote bytes read to disk": "any", "type": "size" }, { @@ -385,28 +433,40 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "timing" }, { - "local blocks read": "any", - "type": "sum" + "shuffle bytes written": "any", + "type": "size" + }, + { + "data size": "any", + "type": "size" }, { "local bytes read": "any", "type": "size" }, { - "records read": "any", - "type": "sum" + "shuffle write time": "any", + "type": "nsTiming" }, { - "shuffle bytes written": "any", + "remote bytes read": "any", "type": "size" }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "remote blocks read": "any", + "type": "sum" + }, { "shuffle records written": "any", "type": "sum" }, { - "shuffle write time": "any", - "type": "nsTiming" + "number of partitions": "any", + "type": "sum" } ] } @@ -509,7 +569,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "data size": "any", + "records read": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoA", + "type": "sum" + }, + { + "remote bytes read to disk": "any", "type": "size" }, { @@ -517,28 +581,40 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "timing" }, { - "local blocks read": "any", - "type": "sum" + "shuffle bytes written": "any", + "type": "size" + }, + { + "data size": "any", + "type": "size" }, { "local bytes read": "any", "type": "size" }, { - "records read": 2, - "type": "sum" + "shuffle write time": "any", + "type": "nsTiming" }, { - "shuffle bytes written": "any", + "remote bytes read": "any", "type": "size" }, { - "shuffle records written": 2, + "local blocks read": "any", "type": "sum" }, { - "shuffle write time": "any", - "type": "nsTiming" + "remote blocks read": "any", + "type": "sum" + }, + { + "shuffle records written": "CgkJCCGEEEII8T8SABoAIQAAAAAAAPA/", + "type": "sum" + }, + { + "number of partitions": "any", + "type": "sum" } ] } @@ -774,7 +850,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "size" }, { - "shuffle records written": 1, + "shuffle records written": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA", "type": "sum" }, { @@ -811,12 +887,28 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 1, + "number of sort fallback tasks": "any", "type": "sum" }, + { + "avg hash probe bucket list iters": "any", + "type": "average" + }, { "time in aggregation build": "any", "type": "timing" + }, + { + "spill size": "any", + "type": "size" + }, + { + "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA", + "type": "sum" + }, + { + "peak memory": "any", + "type": "size" } ], "children": [ @@ -928,7 +1020,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "data size": "any", + "records read": "any", + "type": "sum" + }, + { + "remote bytes read to disk": "any", "type": "size" }, { @@ -936,28 +1032,40 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "timing" }, { - "local blocks read": "any", - "type": "sum" + "shuffle bytes written": "any", + "type": "size" + }, + { + "data size": "any", + "type": "size" }, { "local bytes read": "any", "type": "size" }, { - "records read": "any", - "type": "sum" + "shuffle write time": "any", + "type": "nsTiming" }, { - "shuffle bytes written": "any", + "remote bytes read": "any", "type": "size" }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "remote blocks read": "any", + "type": "sum" + }, { "shuffle records written": "any", "type": "sum" }, { - "shuffle write time": "any", - "type": "nsTiming" + "number of partitions": "any", + "type": "sum" } ] } @@ -1054,7 +1162,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "data size": "any", + "records read": "any", + "type": "sum" + }, + { + "remote bytes read to disk": "any", "type": "size" }, { @@ -1062,28 +1174,40 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "timing" }, { - "local blocks read": "any", - "type": "sum" + "shuffle bytes written": "any", + "type": "size" + }, + { + "data size": "any", + "type": "size" }, { "local bytes read": "any", "type": "size" }, { - "records read": "any", - "type": "sum" + "shuffle write time": "any", + "type": "nsTiming" }, { - "shuffle bytes written": "any", + "remote bytes read": "any", "type": "size" }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "remote blocks read": "any", + "type": "sum" + }, { "shuffle records written": "any", "type": "sum" }, { - "shuffle write time": "any", - "type": "nsTiming" + "number of partitions": "any", + "type": "sum" } ] } @@ -1138,13 +1262,13 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "resultExpressions" : [ "count(1)#42L AS count#43L" ] }, "metrics": [ - { - "number of output rows": "any", - "type": "sum" - }, { "time in aggregation build": "any", "type": "timing" + }, + { + "number of output rows": "any", + "type": "sum" } ], "children": [ @@ -1173,7 +1297,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "data size": "any", + "records read": "any", + "type": "sum" + }, + { + "remote bytes read to disk": "any", "type": "size" }, { @@ -1181,28 +1309,40 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "timing" }, { - "local blocks read": "any", - "type": "sum" + "shuffle bytes written": "any", + "type": "size" + }, + { + "data size": "any", + "type": "size" }, { "local bytes read": "any", "type": "size" }, { - "records read": "any", - "type": "sum" + "shuffle write time": "any", + "type": "nsTiming" }, { - "shuffle bytes written": "any", + "remote bytes read": "any", "type": "size" }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "remote blocks read": "any", + "type": "sum" + }, { "shuffle records written": "any", "type": "sum" }, { - "shuffle write time": "any", - "type": "nsTiming" + "number of partitions": "any", + "type": "sum" } ] } diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java index fdae211077e..fdd2a2fa076 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java @@ -1,14 +1,20 @@ package datadog.trace.instrumentation.spark; +import datadog.trace.util.MethodHandles; +import java.lang.invoke.MethodHandle; import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.spark.SparkConf; +import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.StageInfo; import org.apache.spark.sql.execution.SparkPlanInfo; import org.apache.spark.sql.execution.metric.SQLMetricInfo; +import org.apache.spark.util.AccumulatorV2; +import scala.Function1; import scala.collection.JavaConverters; +import scala.collection.mutable.ArrayBuffer; /** * DatadogSparkListener compiled for Scala 2.12 @@ -17,6 +23,13 @@ * compiled with the specific scala version */ public class DatadogSpark212Listener extends AbstractDatadogSparkListener { + private static final MethodHandles methodLoader = + new MethodHandles(ClassLoader.getSystemClassLoader()); + private static final MethodHandle externalAccums = + methodLoader.method(TaskMetrics.class, "externalAccums"); + private static final MethodHandle withExternalAccums = + methodLoader.method(TaskMetrics.class, "withExternalAccums", new Class[] {}); + public DatadogSpark212Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); } @@ -62,4 +75,27 @@ protected int[] getStageParentIds(StageInfo info) { return parentIds; } + + @Override + protected List getExternalAccumulators(TaskMetrics metrics) { + if (metrics == null) { + return null; + } + + Function1 lambda = + (Function1, List>) + accumulators -> JavaConverters.seqAsJavaList(accumulators); + List res = methodLoader.invoke(withExternalAccums, metrics, lambda); + if (res != null) { + return res; + } + + // withExternalAccums didn't work, try the legacy method + ArrayBuffer accumulators = methodLoader.invoke(externalAccums, metrics); + if (accumulators != null && !accumulators.isEmpty()) { + return JavaConverters.seqAsJavaList(accumulators); + } + + return null; + } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java index 115cdcbb9b0..8d1e3fa4747 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java @@ -1,13 +1,19 @@ package datadog.trace.instrumentation.spark; +import datadog.trace.util.MethodHandles; +import java.lang.invoke.MethodHandle; import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.spark.SparkConf; +import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.StageInfo; import org.apache.spark.sql.execution.SparkPlanInfo; import org.apache.spark.sql.execution.metric.SQLMetricInfo; +import org.apache.spark.util.AccumulatorV2; +import scala.Function1; +import scala.collection.mutable.ArrayBuffer; import scala.jdk.javaapi.CollectionConverters; /** @@ -17,6 +23,13 @@ * compiled with the specific scala version */ public class DatadogSpark213Listener extends AbstractDatadogSparkListener { + private static final MethodHandles methodLoader = + new MethodHandles(ClassLoader.getSystemClassLoader()); + private static final MethodHandle externalAccums = + methodLoader.method(TaskMetrics.class, "externalAccums"); + private static final MethodHandle withExternalAccums = + methodLoader.method(TaskMetrics.class, "withExternalAccums", new Class[] {}); + public DatadogSpark213Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); } @@ -62,4 +75,27 @@ protected int[] getStageParentIds(StageInfo info) { return parentIds; } + + @Override + protected List getExternalAccumulators(TaskMetrics metrics) { + if (metrics == null) { + return null; + } + + Function1 lambda = + (Function1, List>) + accumulators -> CollectionConverters.asJava(accumulators); + List res = methodLoader.invoke(withExternalAccums, metrics, lambda); + if (res != null) { + return res; + } + + // withExternalAccums didn't work, try the legacy method + ArrayBuffer accumulators = methodLoader.invoke(externalAccums, metrics); + if (accumulators != null && !accumulators.isEmpty()) { + return CollectionConverters.asJava(accumulators); + } + + return null; + } }