diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleCounter.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleCounter.java index 3ad72a0689c..fa63a720d9f 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleCounter.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleCounter.java @@ -2,8 +2,6 @@ import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofDoubles; import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.COUNTER; -import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME; -import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER; import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage; import datadog.trace.relocate.api.RatelimitedLogger; @@ -74,23 +72,18 @@ public DoubleCounterBuilder setUnit(String unit) { @Override public DoubleCounter build() { return new OtelDoubleCounter( - meter.registerStorage(builder.descriptor(), OtelMetricStorage::newDoubleSumStorage)); + meter.registerStorage(builder, OtelMetricStorage::newDoubleSumStorage)); } @Override - public ObservableDoubleCounter buildWithCallback( - Consumer callback) { - // FIXME: implement callback - return NOOP_METER - .counterBuilder(NOOP_INSTRUMENT_NAME) - .ofDoubles() - .buildWithCallback(callback); + public ObservableDoubleMeasurement buildObserver() { + return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleSumStorage); } @Override - public ObservableDoubleMeasurement buildObserver() { - // FIXME: implement observer - return NOOP_METER.counterBuilder(NOOP_INSTRUMENT_NAME).ofDoubles().buildObserver(); + public ObservableDoubleCounter buildWithCallback( + Consumer callback) { + return meter.registerObservableCallback(callback, buildObserver()); } } } diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleGauge.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleGauge.java index e680d69d611..1399e75cd20 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleGauge.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleGauge.java @@ -2,8 +2,6 @@ import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofDoubles; import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.GAUGE; -import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME; -import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER; import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage; import io.opentelemetry.api.common.Attributes; @@ -66,19 +64,17 @@ public LongGaugeBuilder ofLongs() { @Override public DoubleGauge build() { return new OtelDoubleGauge( - meter.registerStorage(builder.descriptor(), OtelMetricStorage::newDoubleValueStorage)); + meter.registerStorage(builder, OtelMetricStorage::newDoubleValueStorage)); } @Override - public ObservableDoubleGauge buildWithCallback(Consumer callback) { - // FIXME: implement callback - return NOOP_METER.gaugeBuilder(NOOP_INSTRUMENT_NAME).buildWithCallback(callback); + public ObservableDoubleMeasurement buildObserver() { + return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleValueStorage); } @Override - public ObservableDoubleMeasurement buildObserver() { - // FIXME: implement observer - return NOOP_METER.gaugeBuilder(NOOP_INSTRUMENT_NAME).buildObserver(); + public ObservableDoubleGauge buildWithCallback(Consumer callback) { + return meter.registerObservableCallback(callback, buildObserver()); } } } diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleHistogram.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleHistogram.java index f3b54923c75..6f849f2b2ba 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleHistogram.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleHistogram.java @@ -102,8 +102,7 @@ public LongHistogramBuilder ofLongs() { public DoubleHistogram build() { return new OtelDoubleHistogram( meter.registerStorage( - builder.descriptor(), - descriptor -> newHistogramStorage(descriptor, bucketBoundaries))); + builder, descriptor -> newHistogramStorage(descriptor, bucketBoundaries))); } static List validateBoundaries(List boundaries) { diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleUpDownCounter.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleUpDownCounter.java index 6e0d2159ee0..41af785540c 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleUpDownCounter.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelDoubleUpDownCounter.java @@ -2,8 +2,6 @@ import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofDoubles; import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.UP_DOWN_COUNTER; -import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME; -import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER; import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage; import io.opentelemetry.api.common.Attributes; @@ -60,23 +58,18 @@ public DoubleUpDownCounterBuilder setUnit(String unit) { @Override public DoubleUpDownCounter build() { return new OtelDoubleUpDownCounter( - meter.registerStorage(builder.descriptor(), OtelMetricStorage::newDoubleSumStorage)); + meter.registerStorage(builder, OtelMetricStorage::newDoubleSumStorage)); } @Override - public ObservableDoubleUpDownCounter buildWithCallback( - Consumer callback) { - // FIXME: implement callback - return NOOP_METER - .upDownCounterBuilder(NOOP_INSTRUMENT_NAME) - .ofDoubles() - .buildWithCallback(callback); + public ObservableDoubleMeasurement buildObserver() { + return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleSumStorage); } @Override - public ObservableDoubleMeasurement buildObserver() { - // FIXME: implement observer - return NOOP_METER.upDownCounterBuilder(NOOP_INSTRUMENT_NAME).ofDoubles().buildObserver(); + public ObservableDoubleUpDownCounter buildWithCallback( + Consumer callback) { + return meter.registerObservableCallback(callback, buildObserver()); } } } diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelInstrumentBuilder.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelInstrumentBuilder.java index 4ee187fa654..14f7cfec256 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelInstrumentBuilder.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelInstrumentBuilder.java @@ -75,4 +75,27 @@ OtelInstrumentDescriptor descriptor() { return new OtelInstrumentDescriptor( instrumentName, instrumentType, longValues, description, unit); } + + OtelInstrumentDescriptor observableDescriptor() { + return new OtelInstrumentDescriptor( + instrumentName, observableType(instrumentType), longValues, description, unit); + } + + /** + * Maps the given {@link OtelInstrumentType} to its observable equivalent. + * + * @throws IllegalArgumentException if the type has no observable equivalent + */ + private OtelInstrumentType observableType(OtelInstrumentType instrumentType) { + switch (instrumentType) { + case COUNTER: + return OtelInstrumentType.OBSERVABLE_COUNTER; + case UP_DOWN_COUNTER: + return OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER; + case GAUGE: + return OtelInstrumentType.OBSERVABLE_GAUGE; + default: + throw new IllegalArgumentException(instrumentType + " has no observable equivalent"); + } + } } diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongCounter.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongCounter.java index 8c6d77fcfa6..b9ea0468e38 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongCounter.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongCounter.java @@ -2,8 +2,6 @@ import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofLongs; import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.COUNTER; -import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME; -import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER; import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage; import datadog.trace.relocate.api.RatelimitedLogger; @@ -80,19 +78,17 @@ public DoubleCounterBuilder ofDoubles() { @Override public LongCounter build() { return new OtelLongCounter( - meter.registerStorage(builder.descriptor(), OtelMetricStorage::newLongSumStorage)); + meter.registerStorage(builder, OtelMetricStorage::newLongSumStorage)); } @Override - public ObservableLongCounter buildWithCallback(Consumer callback) { - // FIXME: implement callback - return NOOP_METER.counterBuilder(NOOP_INSTRUMENT_NAME).buildWithCallback(callback); + public ObservableLongMeasurement buildObserver() { + return meter.registerObservableStorage(builder, OtelMetricStorage::newLongSumStorage); } @Override - public ObservableLongMeasurement buildObserver() { - // FIXME: implement observer - return NOOP_METER.counterBuilder(NOOP_INSTRUMENT_NAME).buildObserver(); + public ObservableLongCounter buildWithCallback(Consumer callback) { + return meter.registerObservableCallback(callback, buildObserver()); } } } diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongGauge.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongGauge.java index 2b0057a92fd..01aefde2db8 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongGauge.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongGauge.java @@ -2,8 +2,6 @@ import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofLongs; import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.GAUGE; -import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME; -import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER; import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage; import io.opentelemetry.api.common.Attributes; @@ -60,19 +58,17 @@ public LongGaugeBuilder setUnit(String unit) { @Override public LongGauge build() { return new OtelLongGauge( - meter.registerStorage(builder.descriptor(), OtelMetricStorage::newLongValueStorage)); + meter.registerStorage(builder, OtelMetricStorage::newLongValueStorage)); } @Override - public ObservableLongGauge buildWithCallback(Consumer callback) { - // FIXME: implement callback - return NOOP_METER.gaugeBuilder(NOOP_INSTRUMENT_NAME).ofLongs().buildWithCallback(callback); + public ObservableLongMeasurement buildObserver() { + return meter.registerObservableStorage(builder, OtelMetricStorage::newLongValueStorage); } @Override - public ObservableLongMeasurement buildObserver() { - // FIXME: implement observer - return NOOP_METER.gaugeBuilder(NOOP_INSTRUMENT_NAME).ofLongs().buildObserver(); + public ObservableLongGauge buildWithCallback(Consumer callback) { + return meter.registerObservableCallback(callback, buildObserver()); } } } diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongHistogram.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongHistogram.java index 730134ba4d4..ddc52d2dbf2 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongHistogram.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongHistogram.java @@ -91,8 +91,7 @@ public LongHistogramBuilder setExplicitBucketBoundariesAdvice(List bucketB public LongHistogram build() { return new OtelLongHistogram( meter.registerStorage( - builder.descriptor(), - descriptor -> newHistogramStorage(descriptor, bucketBoundaries))); + builder, descriptor -> newHistogramStorage(descriptor, bucketBoundaries))); } } } diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongUpDownCounter.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongUpDownCounter.java index 52b07b0392c..0fe94c76bc2 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongUpDownCounter.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelLongUpDownCounter.java @@ -2,8 +2,6 @@ import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofLongs; import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.UP_DOWN_COUNTER; -import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME; -import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER; import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage; import io.opentelemetry.api.common.Attributes; @@ -66,20 +64,18 @@ public DoubleUpDownCounterBuilder ofDoubles() { @Override public LongUpDownCounter build() { return new OtelLongUpDownCounter( - meter.registerStorage(builder.descriptor(), OtelMetricStorage::newLongSumStorage)); + meter.registerStorage(builder, OtelMetricStorage::newLongSumStorage)); } @Override - public ObservableLongUpDownCounter buildWithCallback( - Consumer callback) { - // FIXME: implement callback - return NOOP_METER.upDownCounterBuilder(NOOP_INSTRUMENT_NAME).buildWithCallback(callback); + public ObservableLongMeasurement buildObserver() { + return meter.registerObservableStorage(builder, OtelMetricStorage::newLongSumStorage); } @Override - public ObservableLongMeasurement buildObserver() { - // FIXME: implement observer - return NOOP_METER.upDownCounterBuilder(NOOP_INSTRUMENT_NAME).buildObserver(); + public ObservableLongUpDownCounter buildWithCallback( + Consumer callback) { + return meter.registerObservableCallback(callback, buildObserver()); } } } diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelMeter.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelMeter.java index 6213d9084f4..a90637ceaaf 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelMeter.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelMeter.java @@ -1,5 +1,9 @@ package datadog.opentelemetry.shim.metrics; +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Stream.concat; + import datadog.opentelemetry.shim.OtelInstrumentationScope; import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage; import datadog.opentelemetry.shim.metrics.export.OtelMeterVisitor; @@ -11,10 +15,14 @@ import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.api.metrics.ObservableMeasurement; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import java.util.function.Function; import java.util.regex.Pattern; +import java.util.stream.Stream; import javax.annotation.Nullable; import javax.annotation.ParametersAreNonnullByDefault; import org.slf4j.Logger; @@ -35,6 +43,8 @@ final class OtelMeter implements Meter { private final Map storage = new ConcurrentHashMap<>(); + private final List observables = new ArrayList<>(); + OtelMeter(OtelInstrumentationScope instrumentationScope) { this.instrumentationScope = instrumentationScope; } @@ -76,8 +86,12 @@ public BatchCallback batchCallback( Runnable callback, ObservableMeasurement observableMeasurement, ObservableMeasurement... additionalMeasurements) { - // FIXME: implement callback - return NOOP_METER.batchCallback(callback, observableMeasurement, additionalMeasurements); + return registerObservableCallback( + callback, + concat(Stream.of(observableMeasurement), Stream.of(additionalMeasurements)) + .filter(OtelObservableMeasurement.class::isInstance) + .map(OtelObservableMeasurement.class::cast) + .collect(toList())); } @Override @@ -86,12 +100,44 @@ public String toString() { } OtelMetricStorage registerStorage( - OtelInstrumentDescriptor descriptor, + OtelInstrumentBuilder builder, Function storageFactory) { - return storage.computeIfAbsent(descriptor, storageFactory); + return storage.computeIfAbsent(builder.descriptor(), storageFactory); + } + + OtelObservableMeasurement registerObservableStorage( + OtelInstrumentBuilder builder, + Function storageFactory) { + return new OtelObservableMeasurement( + storage.computeIfAbsent(builder.observableDescriptor(), storageFactory)); + } + + OtelObservableCallback registerObservableCallback(Consumer callback, M measurement) { + return registerObservableCallback( + () -> callback.accept(measurement), singletonList((OtelObservableMeasurement) measurement)); + } + + OtelObservableCallback registerObservableCallback( + Runnable callback, List measurements) { + OtelObservableCallback observable = new OtelObservableCallback(this, callback, measurements); + synchronized (observables) { + observables.add(observable); + } + return observable; + } + + boolean unregisterObservableCallback(OtelObservableCallback observable) { + synchronized (observables) { + return observables.remove(observable); + } } void collect(OtelMeterVisitor visitor) { + List observablesCopy; + synchronized (observables) { + observablesCopy = new ArrayList<>(observables); + } + observablesCopy.forEach(OtelObservableCallback::observeMeasurements); storage.forEach((descriptor, storage) -> storage.collect(visitor.visitInstrument(descriptor))); } diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelMeterProvider.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelMeterProvider.java index c133eb9368d..2943f19914e 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelMeterProvider.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelMeterProvider.java @@ -1,7 +1,7 @@ package datadog.opentelemetry.shim.metrics; import datadog.opentelemetry.shim.OtelInstrumentationScope; -import datadog.opentelemetry.shim.metrics.export.OtelMetricVisitor; +import datadog.opentelemetry.shim.metrics.export.OtelMetricsVisitor; import datadog.trace.util.Strings; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterBuilder; @@ -33,7 +33,7 @@ public MeterBuilder meterBuilder(String instrumentationScopeName) { return new OtelMeterBuilder(this, instrumentationScopeName); } - public void collect(OtelMetricVisitor visitor) { + public void collectMetrics(OtelMetricsVisitor visitor) { meters.forEach((scope, meter) -> meter.collect(visitor.visitMeter(scope))); } diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelObservableCallback.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelObservableCallback.java new file mode 100644 index 00000000000..e51edde0b4e --- /dev/null +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelObservableCallback.java @@ -0,0 +1,57 @@ +package datadog.opentelemetry.shim.metrics; + +import datadog.trace.relocate.api.RatelimitedLogger; +import io.opentelemetry.api.metrics.BatchCallback; +import io.opentelemetry.api.metrics.ObservableDoubleCounter; +import io.opentelemetry.api.metrics.ObservableDoubleGauge; +import io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter; +import io.opentelemetry.api.metrics.ObservableLongCounter; +import io.opentelemetry.api.metrics.ObservableLongGauge; +import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class OtelObservableCallback + implements ObservableDoubleCounter, + ObservableLongCounter, + ObservableDoubleGauge, + ObservableLongGauge, + ObservableDoubleUpDownCounter, + ObservableLongUpDownCounter, + BatchCallback { + + private static final Logger LOGGER = LoggerFactory.getLogger(OtelObservableCallback.class); + private static final RatelimitedLogger RATELIMITED_LOGGER = + new RatelimitedLogger(LOGGER, 5, TimeUnit.MINUTES); + + private final OtelMeter meter; + private final Runnable callback; + private final List measurements; + + OtelObservableCallback( + OtelMeter meter, Runnable callback, List measurements) { + this.meter = meter; + this.callback = callback; + this.measurements = measurements; + } + + void observeMeasurements() { + measurements.forEach(OtelObservableMeasurement::activate); + try { + callback.run(); + } catch (Throwable e) { + RATELIMITED_LOGGER.warn("An exception occurred invoking callback for {}.", measurements, e); + } finally { + measurements.forEach(OtelObservableMeasurement::passivate); + } + } + + @Override + public void close() { + if (!meter.unregisterObservableCallback(this)) { + RATELIMITED_LOGGER.warn("Callback for {} has called close() multiple times.", measurements); + } + } +} diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelObservableMeasurement.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelObservableMeasurement.java new file mode 100644 index 00000000000..3143acb9039 --- /dev/null +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/OtelObservableMeasurement.java @@ -0,0 +1,71 @@ +package datadog.opentelemetry.shim.metrics; + +import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage; +import datadog.trace.relocate.api.RatelimitedLogger; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.concurrent.TimeUnit; +import javax.annotation.ParametersAreNonnullByDefault; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ParametersAreNonnullByDefault +final class OtelObservableMeasurement + implements ObservableDoubleMeasurement, ObservableLongMeasurement { + + private static final Logger LOGGER = LoggerFactory.getLogger(OtelObservableMeasurement.class); + private static final RatelimitedLogger RATELIMITED_LOGGER = + new RatelimitedLogger(LOGGER, 5, TimeUnit.MINUTES); + + private final OtelMetricStorage storage; + private volatile boolean active; + + OtelObservableMeasurement(OtelMetricStorage storage) { + this.storage = storage; + } + + void activate() { + this.active = true; + } + + void passivate() { + this.active = false; + } + + @Override + public void record(double value) { + record(value, Attributes.empty()); + } + + @Override + public void record(double value, Attributes attributes) { + if (active) { + storage.recordDouble(value, attributes); + } else { + logNotActive(); + } + } + + @Override + public void record(long value) { + record(value, Attributes.empty()); + } + + @Override + public void record(long value, Attributes attributes) { + if (active) { + storage.recordLong(value, attributes); + } else { + logNotActive(); + } + } + + private void logNotActive() { + if (LOGGER.isDebugEnabled()) { + RATELIMITED_LOGGER.warn( + "Measurement recorded for instrument {} outside callback registered to instrument. Dropping measurement.", + storage.getInstrumentName()); + } + } +} diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/data/OtelMetricStorage.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/data/OtelMetricStorage.java index 4875a6a3b9b..58a63309c34 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/data/OtelMetricStorage.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/data/OtelMetricStorage.java @@ -112,6 +112,13 @@ public void recordLong(long value, Attributes attributes) { } public void recordDouble(double value, Attributes attributes) { + if (Double.isNaN(value)) { + LOGGER.debug( + "Instrument {} has recorded measurement Not-a-Number (NaN) value with attributes {}. Dropping measurement.", + getInstrumentName(), + attributes); + return; + } if (resetOnCollect) { Recording recording = acquireRecordingForWrite(); try { diff --git a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/export/OtelMetricVisitor.java b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/export/OtelMetricsVisitor.java similarity index 86% rename from dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/export/OtelMetricVisitor.java rename to dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/export/OtelMetricsVisitor.java index ac2a5ac2e16..5f8245a6180 100644 --- a/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/export/OtelMetricVisitor.java +++ b/dd-java-agent/agent-otel/otel-shim/src/main/java/datadog/opentelemetry/shim/metrics/export/OtelMetricsVisitor.java @@ -2,7 +2,7 @@ import datadog.opentelemetry.shim.OtelInstrumentationScope; -public interface OtelMetricVisitor { +public interface OtelMetricsVisitor { /** Visits a meter created by the OpenTelemetry API. */ OtelMeterVisitor visitMeter(OtelInstrumentationScope scope); } diff --git a/dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.47/src/main/java/datadog/trace/instrumentation/opentelemetry147/OpenTelemetryMetricsInstrumentation.java b/dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.47/src/main/java/datadog/trace/instrumentation/opentelemetry147/OpenTelemetryMetricsInstrumentation.java index 61d959cc249..8ac7928278c 100644 --- a/dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.47/src/main/java/datadog/trace/instrumentation/opentelemetry147/OpenTelemetryMetricsInstrumentation.java +++ b/dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.47/src/main/java/datadog/trace/instrumentation/opentelemetry147/OpenTelemetryMetricsInstrumentation.java @@ -83,7 +83,7 @@ public String[] helperClassNames() { "datadog.opentelemetry.shim.metrics.data.OtelMetricStorage$Recording", "datadog.opentelemetry.shim.metrics.export.OtelInstrumentVisitor", "datadog.opentelemetry.shim.metrics.export.OtelMeterVisitor", - "datadog.opentelemetry.shim.metrics.export.OtelMetricVisitor", + "datadog.opentelemetry.shim.metrics.export.OtelMetricsVisitor", "datadog.opentelemetry.shim.metrics.OtelDoubleCounter", "datadog.opentelemetry.shim.metrics.OtelDoubleCounter$Builder", "datadog.opentelemetry.shim.metrics.OtelDoubleGauge", @@ -100,6 +100,8 @@ public String[] helperClassNames() { "datadog.opentelemetry.shim.metrics.OtelLongHistogram$Builder", "datadog.opentelemetry.shim.metrics.OtelLongUpDownCounter", "datadog.opentelemetry.shim.metrics.OtelLongUpDownCounter$Builder", + "datadog.opentelemetry.shim.metrics.OtelObservableCallback", + "datadog.opentelemetry.shim.metrics.OtelObservableMeasurement", }; } diff --git a/dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.47/src/test/groovy/opentelemetry147/metrics/MetricsTest.groovy b/dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.47/src/test/groovy/opentelemetry147/metrics/MetricsTest.groovy new file mode 100644 index 00000000000..2c77e18d802 --- /dev/null +++ b/dd-java-agent/instrumentation/opentelemetry/opentelemetry-1.47/src/test/groovy/opentelemetry147/metrics/MetricsTest.groovy @@ -0,0 +1,464 @@ +package opentelemetry147.metrics + +import static io.opentelemetry.api.common.AttributeKey.stringKey + +import datadog.opentelemetry.shim.OtelInstrumentationScope +import datadog.opentelemetry.shim.metrics.OtelInstrumentDescriptor +import datadog.opentelemetry.shim.metrics.OtelMeterProvider +import datadog.opentelemetry.shim.metrics.data.OtelDoublePoint +import datadog.opentelemetry.shim.metrics.data.OtelHistogramPoint +import datadog.opentelemetry.shim.metrics.data.OtelLongPoint +import datadog.opentelemetry.shim.metrics.data.OtelPoint +import datadog.opentelemetry.shim.metrics.export.OtelInstrumentVisitor +import datadog.opentelemetry.shim.metrics.export.OtelMeterVisitor +import datadog.opentelemetry.shim.metrics.export.OtelMetricsVisitor +import datadog.trace.agent.test.InstrumentationSpecification +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.common.Attributes +import spock.lang.Shared +import spock.lang.Subject + +class MetricsTest extends InstrumentationSpecification { + + @Override + void configurePreAgent() { + super.configurePreAgent() + + injectSysConfig("dd.integration.opentelemetry-metrics.enabled", "true") + } + + @Shared + Attributes someAttributes = Attributes.of(stringKey("some"), "thing") + + @Subject + def meterProvider = GlobalOpenTelemetry.get().meterProvider as OtelMeterProvider + + @Subject + def meter = meterProvider.get('test') + + def meterReader = new MeterReader() + + def points = [:] + + def "test long counter"() { + setup: + def counter = meter + .counterBuilder("long-counter") + .build() + + when: + counter.add(1) + counter.add(2, someAttributes) + meterProvider.collectMetrics(meterReader) + + then: + points['test:long-counter'] == 1 + points['test:long-counter@[some:thing]'] == 2 + } + + def "test double counter"() { + setup: + def counter = meter + .counterBuilder("double-counter") + .ofDoubles() + .build() + + when: + counter.add(1.2) + counter.add(3.4, someAttributes) + meterProvider.collectMetrics(meterReader) + + then: + points['test:double-counter'] == 1.2 + points['test:double-counter@[some:thing]'] == 3.4 + } + + def "test long up-down counter"() { + setup: + def counter = meter + .upDownCounterBuilder("long-up-down-counter") + .build() + + when: + counter.add(1) + counter.add(2, someAttributes) + meterProvider.collectMetrics(meterReader) + + then: + points['test:long-up-down-counter'] == 1 + points['test:long-up-down-counter@[some:thing]'] == 2 + } + + def "test double up-down counter"() { + setup: + def counter = meter + .upDownCounterBuilder("double-up-down-counter") + .ofDoubles() + .build() + + when: + counter.add(1.2) + counter.add(3.4, someAttributes) + meterProvider.collectMetrics(meterReader) + + then: + points['test:double-up-down-counter'] == 1.2 + points['test:double-up-down-counter@[some:thing]'] == 3.4 + } + + def "test long gauge"() { + setup: + def counter = meter + .gaugeBuilder("long-gauge") + .ofLongs() + .build() + + when: + counter.set(1) + counter.set(2, someAttributes) + meterProvider.collectMetrics(meterReader) + + then: + points['test:long-gauge'] == 1 + points['test:long-gauge@[some:thing]'] == 2 + } + + def "test double gauge"() { + setup: + def counter = meter + .gaugeBuilder("double-gauge") + .build() + + when: + counter.set(1.2) + counter.set(3.4, someAttributes) + meterProvider.collectMetrics(meterReader) + + then: + points['test:double-gauge'] == 1.2 + points['test:double-gauge@[some:thing]'] == 3.4 + } + + def "test long histogram"() { + setup: + def histogram = meter + .histogramBuilder("long-histogram") + .ofLongs() + .build() + + when: + histogram.record(1) + histogram.record(24) + histogram.record(101, someAttributes) + meterProvider.collectMetrics(meterReader) + + then: + points['test:long-histogram'] == [2.0, [0.0, 5.0, 10.0, 25.0], [0.0, 1.0, 0.0, 1.0], 25.0] + points['test:long-histogram@[some:thing]'] == [1.0, [100.0, 250.0], [0.0, 1.0], 101.0] + } + + def "test double histogram"() { + setup: + def histogram = meter + .histogramBuilder("double-histogram") + .build() + + when: + histogram.record(1.2) + histogram.record(24.5) + histogram.record(101.2, someAttributes) + meterProvider.collectMetrics(meterReader) + + then: + points['test:double-histogram'] == [2.0, [0.0, 5.0, 10.0, 25.0], [0.0, 1.0, 0.0, 1.0], 25.7] + points['test:double-histogram@[some:thing]'] == [1.0, [100.0, 250.0], [0.0, 1.0], 101.2] + } + + def "test observable long counter"() { + setup: + def observable = meter + .counterBuilder("observable-long-counter") + .buildWithCallback {m -> + m.record(1) + m.record(2, someAttributes) + } + + when: + meterProvider.collectMetrics(meterReader) + + then: + points['test:observable-long-counter'] == 1 + points['test:observable-long-counter@[some:thing]'] == 2 + + cleanup: + observable.close() + } + + def "test observable double counter"() { + setup: + def observable = meter + .counterBuilder("observable-double-counter") + .ofDoubles() + .buildWithCallback {m -> + m.record(1.2) + m.record(3.4, someAttributes) + } + + when: + meterProvider.collectMetrics(meterReader) + + then: + points['test:observable-double-counter'] == 1.2 + points['test:observable-double-counter@[some:thing]'] == 3.4 + + cleanup: + observable.close() + } + + def "test observable long up-down counter"() { + setup: + def observable = meter + .upDownCounterBuilder("observable-long-up-down-counter") + .buildWithCallback {m -> + m.record(1) + m.record(2, someAttributes) + } + + when: + meterProvider.collectMetrics(meterReader) + + then: + points['test:observable-long-up-down-counter'] == 1 + points['test:observable-long-up-down-counter@[some:thing]'] == 2 + + cleanup: + observable.close() + } + + def "test observable double up-down counter"() { + setup: + def observable = meter + .upDownCounterBuilder("observable-double-up-down-counter") + .ofDoubles() + .buildWithCallback {m -> + m.record(1.2) + m.record(3.4, someAttributes) + } + + when: + meterProvider.collectMetrics(meterReader) + + then: + points['test:observable-double-up-down-counter'] == 1.2 + points['test:observable-double-up-down-counter@[some:thing]'] == 3.4 + + cleanup: + observable.close() + } + + def "test observable long gauge"() { + setup: + def observable = meter + .gaugeBuilder("observable-long-gauge") + .ofLongs() + .buildWithCallback {m -> + m.record(1) + m.record(2, someAttributes) + } + + when: + meterProvider.collectMetrics(meterReader) + + then: + points['test:observable-long-gauge'] == 1 + points['test:observable-long-gauge@[some:thing]'] == 2 + + cleanup: + observable.close() + } + + def "test observable double gauge"() { + setup: + def observable = meter + .gaugeBuilder("observable-double-gauge") + .buildWithCallback {m -> + m.record(1.2) + m.record(3.4, someAttributes) + } + + when: + meterProvider.collectMetrics(meterReader) + + then: + points['test:observable-double-gauge'] == 1.2 + points['test:observable-double-gauge@[some:thing]'] == 3.4 + + cleanup: + observable.close() + } + + def "test batch callback"() { + setup: + def longCounterObserver = meter + .counterBuilder("long-counter-observer") + .buildObserver() + def doubleCounterObserver = meter + .counterBuilder("double-counter-observer") + .ofDoubles() + .buildObserver() + def longUpDownCounterObserver = meter + .upDownCounterBuilder("long-up-down-counter-observer") + .buildObserver() + def doubleUpDownCounterObserver = meter + .upDownCounterBuilder("double-up-down-counter-observer") + .ofDoubles() + .buildObserver() + def longGaugeObserver = meter + .gaugeBuilder("long-gauge-observer") + .ofLongs() + .buildObserver() + def doubleGaugeObserver = meter + .gaugeBuilder("double-gauge-observer") + .buildObserver() + def batchCallback = meter + .batchCallback(() -> { + longCounterObserver.record(1) + longCounterObserver.record(10, someAttributes) + doubleCounterObserver.record(2.3) + doubleCounterObserver.record(20.3, someAttributes) + longUpDownCounterObserver.record(4) + longUpDownCounterObserver.record(40, someAttributes) + doubleUpDownCounterObserver.record(5.6) + doubleUpDownCounterObserver.record(50.6, someAttributes) + longGaugeObserver.record(7) + longGaugeObserver.record(70, someAttributes) + doubleGaugeObserver.record(8.9) + doubleGaugeObserver.record(80.9, someAttributes) + }, + longCounterObserver, + doubleCounterObserver, + longUpDownCounterObserver, + doubleUpDownCounterObserver, + longGaugeObserver, + doubleGaugeObserver) + + // this callback will have no effect because it doesn't declare any measurements + def noopCallback = meter + .batchCallback(() -> { + longCounterObserver.record(1000) + longCounterObserver.record(1000, someAttributes) + doubleCounterObserver.record(1000) + doubleCounterObserver.record(1000, someAttributes) + longUpDownCounterObserver.record(1000) + longUpDownCounterObserver.record(1000, someAttributes) + doubleUpDownCounterObserver.record(1000) + doubleUpDownCounterObserver.record(1000, someAttributes) + longGaugeObserver.record(1000) + longGaugeObserver.record(1000, someAttributes) + doubleGaugeObserver.record(1000) + doubleGaugeObserver.record(1000, someAttributes) + }, null) + + when: + meterProvider.collectMetrics(meterReader) + + then: + points['test:long-counter-observer'] == 1 + points['test:long-counter-observer@[some:thing]'] == 10 + points['test:double-counter-observer'] == 2.3 + points['test:double-counter-observer@[some:thing]'] == 20.3 + points['test:long-up-down-counter-observer'] == 4 + points['test:long-up-down-counter-observer@[some:thing]'] == 40 + points['test:double-up-down-counter-observer'] == 5.6 + points['test:double-up-down-counter-observer@[some:thing]'] == 50.6 + points['test:long-gauge-observer'] == 7 + points['test:long-gauge-observer@[some:thing]'] == 70 + points['test:double-gauge-observer'] == 8.9 + points['test:double-gauge-observer@[some:thing]'] == 80.9 + + when: + points.clear() + // this should invoke batchCallback again + meterProvider.collectMetrics(meterReader) + + then: + // delta mode: counters show values added during last collect + points['test:long-counter-observer'] == 1 + points['test:long-counter-observer@[some:thing]'] == 10 + points['test:double-counter-observer'] == 2.3 + points['test:double-counter-observer@[some:thing]'] == 20.3 + // up-down counters stay cumulative: they show the running total + points['test:long-up-down-counter-observer'] == 8 + points['test:long-up-down-counter-observer@[some:thing]'] == 80 + points['test:double-up-down-counter-observer'] == 11.2 + points['test:double-up-down-counter-observer@[some:thing]'] == 101.2 + // gauges also stay cumulative: they only show latest value + points['test:long-gauge-observer'] == 7 + points['test:long-gauge-observer@[some:thing]'] == 70 + points['test:double-gauge-observer'] == 8.9 + points['test:double-gauge-observer@[some:thing]'] == 80.9 + + when: + batchCallback.close() + points.clear() + // batchCallback will not be invoked as it it closed + meterProvider.collectMetrics(meterReader) + + then: + // delta mode: no values were added as batchCallback is closed + points['test:long-counter-observer'] == null + points['test:long-counter-observer@[some:thing]'] == null + points['test:double-counter-observer'] == null + points['test:double-counter-observer@[some:thing]'] == null + // up-down counters stay cumulative: they show the running total + points['test:long-up-down-counter-observer'] == 8 + points['test:long-up-down-counter-observer@[some:thing]'] == 80 + points['test:double-up-down-counter-observer'] == 11.2 + points['test:double-up-down-counter-observer@[some:thing]'] == 101.2 + // gauges also stay cumulative: they only show latest value + points['test:long-gauge-observer'] == 7 + points['test:long-gauge-observer@[some:thing]'] == 70 + points['test:double-gauge-observer'] == 8.9 + points['test:double-gauge-observer@[some:thing]'] == 80.9 + + cleanup: + noopCallback.close() + } + + class MeterReader implements OtelMetricsVisitor, OtelMeterVisitor, OtelInstrumentVisitor { + def scopeName + def instrumentName + + @Override + OtelMeterVisitor visitMeter(OtelInstrumentationScope scope) { + scopeName = scope.name + return this + } + + @Override + OtelInstrumentVisitor visitInstrument(OtelInstrumentDescriptor descriptor) { + instrumentName = descriptor.name + return this + } + + @Override + void visitPoint(Attributes attributes, OtelPoint point) { + def key = scopeName + ':' + instrumentName + if (!attributes.isEmpty()) { + key = key + '@' + attributes.asMap() + } + switch (point.class) { + case OtelLongPoint: + points.put(key, (point as OtelLongPoint).value) + break + case OtelDoublePoint: + points.put(key, (point as OtelDoublePoint).value) + break + case OtelHistogramPoint: + OtelHistogramPoint h = point as OtelHistogramPoint + points.put(key, [h.count, h.bucketBoundaries, h.bucketCounts, h.sum]) + break + } + } + } +}