Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofDoubles;
import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.COUNTER;
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME;
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER;

import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
import datadog.trace.relocate.api.RatelimitedLogger;
Expand Down Expand Up @@ -74,23 +72,20 @@ public DoubleCounterBuilder setUnit(String unit) {
@Override
public DoubleCounter build() {
return new OtelDoubleCounter(
meter.registerStorage(builder.descriptor(), OtelMetricStorage::newDoubleSumStorage));
meter.registerStorage(builder, OtelMetricStorage::newDoubleSumStorage));
}

@Override
public ObservableDoubleCounter buildWithCallback(
Consumer<ObservableDoubleMeasurement> callback) {
// FIXME: implement callback
return NOOP_METER
.counterBuilder(NOOP_INSTRUMENT_NAME)
.ofDoubles()
.buildWithCallback(callback);
public ObservableDoubleMeasurement buildObserver() {
return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleSumStorage);
}

@Override
public ObservableDoubleMeasurement buildObserver() {
// FIXME: implement observer
return NOOP_METER.counterBuilder(NOOP_INSTRUMENT_NAME).ofDoubles().buildObserver();
public ObservableDoubleCounter buildWithCallback(
Consumer<ObservableDoubleMeasurement> callback) {
ObservableDoubleMeasurement measurement = buildObserver();
Runnable runnable = () -> callback.accept(measurement);
return meter.registerObservableCallback(runnable, (OtelObservableMeasurement) measurement);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofDoubles;
import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.GAUGE;
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME;
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER;

import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -66,19 +64,19 @@ public LongGaugeBuilder ofLongs() {
@Override
public DoubleGauge build() {
return new OtelDoubleGauge(
meter.registerStorage(builder.descriptor(), OtelMetricStorage::newDoubleValueStorage));
meter.registerStorage(builder, OtelMetricStorage::newDoubleValueStorage));
}

@Override
public ObservableDoubleGauge buildWithCallback(Consumer<ObservableDoubleMeasurement> callback) {
// FIXME: implement callback
return NOOP_METER.gaugeBuilder(NOOP_INSTRUMENT_NAME).buildWithCallback(callback);
public ObservableDoubleMeasurement buildObserver() {
return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleValueStorage);
}

@Override
public ObservableDoubleMeasurement buildObserver() {
// FIXME: implement observer
return NOOP_METER.gaugeBuilder(NOOP_INSTRUMENT_NAME).buildObserver();
public ObservableDoubleGauge buildWithCallback(Consumer<ObservableDoubleMeasurement> callback) {
ObservableDoubleMeasurement measurement = buildObserver();
Runnable runnable = () -> callback.accept(measurement);
return meter.registerObservableCallback(runnable, (OtelObservableMeasurement) measurement);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ public LongHistogramBuilder ofLongs() {
public DoubleHistogram build() {
return new OtelDoubleHistogram(
meter.registerStorage(
builder.descriptor(),
descriptor -> newHistogramStorage(descriptor, bucketBoundaries)));
builder, descriptor -> newHistogramStorage(descriptor, bucketBoundaries)));
}

static List<Double> validateBoundaries(List<Double> boundaries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofDoubles;
import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.UP_DOWN_COUNTER;
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME;
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER;

import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -60,23 +58,20 @@ public DoubleUpDownCounterBuilder setUnit(String unit) {
@Override
public DoubleUpDownCounter build() {
return new OtelDoubleUpDownCounter(
meter.registerStorage(builder.descriptor(), OtelMetricStorage::newDoubleSumStorage));
meter.registerStorage(builder, OtelMetricStorage::newDoubleSumStorage));
}

@Override
public ObservableDoubleUpDownCounter buildWithCallback(
Consumer<ObservableDoubleMeasurement> callback) {
// FIXME: implement callback
return NOOP_METER
.upDownCounterBuilder(NOOP_INSTRUMENT_NAME)
.ofDoubles()
.buildWithCallback(callback);
public ObservableDoubleMeasurement buildObserver() {
return meter.registerObservableStorage(builder, OtelMetricStorage::newDoubleSumStorage);
}

@Override
public ObservableDoubleMeasurement buildObserver() {
// FIXME: implement observer
return NOOP_METER.upDownCounterBuilder(NOOP_INSTRUMENT_NAME).ofDoubles().buildObserver();
public ObservableDoubleUpDownCounter buildWithCallback(
Consumer<ObservableDoubleMeasurement> callback) {
ObservableDoubleMeasurement measurement = buildObserver();
Runnable runnable = () -> callback.accept(measurement);
return meter.registerObservableCallback(runnable, (OtelObservableMeasurement) measurement);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,26 @@ OtelInstrumentDescriptor descriptor() {
return new OtelInstrumentDescriptor(
instrumentName, instrumentType, longValues, description, unit);
}

OtelInstrumentDescriptor observableDescriptor() {
return new OtelInstrumentDescriptor(
instrumentName, observableType(instrumentType), longValues, description, unit);
}

/**
* Maps the given {@link OtelInstrumentType to its observable equivalent.
* @throws IllegalArgumentException if the type has no observable equivalent
*/
private OtelInstrumentType observableType(OtelInstrumentType instrumentType) {
switch (instrumentType) {
case COUNTER:
return OtelInstrumentType.OBSERVABLE_COUNTER;
case UP_DOWN_COUNTER:
return OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER;
case GAUGE:
return OtelInstrumentType.OBSERVABLE_GAUGE;
default:
throw new IllegalArgumentException(instrumentType + " has no observable equivalent");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofLongs;
import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.COUNTER;
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME;
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER;

import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
import datadog.trace.relocate.api.RatelimitedLogger;
Expand Down Expand Up @@ -80,19 +78,19 @@ public DoubleCounterBuilder ofDoubles() {
@Override
public LongCounter build() {
return new OtelLongCounter(
meter.registerStorage(builder.descriptor(), OtelMetricStorage::newLongSumStorage));
meter.registerStorage(builder, OtelMetricStorage::newLongSumStorage));
}

@Override
public ObservableLongCounter buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
// FIXME: implement callback
return NOOP_METER.counterBuilder(NOOP_INSTRUMENT_NAME).buildWithCallback(callback);
public ObservableLongMeasurement buildObserver() {
return meter.registerObservableStorage(builder, OtelMetricStorage::newLongSumStorage);
}

@Override
public ObservableLongMeasurement buildObserver() {
// FIXME: implement observer
return NOOP_METER.counterBuilder(NOOP_INSTRUMENT_NAME).buildObserver();
public ObservableLongCounter buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
ObservableLongMeasurement measurement = buildObserver();
Runnable runnable = () -> callback.accept(measurement);
return meter.registerObservableCallback(runnable, (OtelObservableMeasurement) measurement);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofLongs;
import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.GAUGE;
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME;
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER;

import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -60,19 +58,19 @@ public LongGaugeBuilder setUnit(String unit) {
@Override
public LongGauge build() {
return new OtelLongGauge(
meter.registerStorage(builder.descriptor(), OtelMetricStorage::newLongValueStorage));
meter.registerStorage(builder, OtelMetricStorage::newLongValueStorage));
}

@Override
public ObservableLongGauge buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
// FIXME: implement callback
return NOOP_METER.gaugeBuilder(NOOP_INSTRUMENT_NAME).ofLongs().buildWithCallback(callback);
public ObservableLongMeasurement buildObserver() {
return meter.registerObservableStorage(builder, OtelMetricStorage::newLongValueStorage);
}

@Override
public ObservableLongMeasurement buildObserver() {
// FIXME: implement observer
return NOOP_METER.gaugeBuilder(NOOP_INSTRUMENT_NAME).ofLongs().buildObserver();
public ObservableLongGauge buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
ObservableLongMeasurement measurement = buildObserver();
Runnable runnable = () -> callback.accept(measurement);
return meter.registerObservableCallback(runnable, (OtelObservableMeasurement) measurement);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public LongHistogramBuilder setExplicitBucketBoundariesAdvice(List<Long> bucketB
public LongHistogram build() {
return new OtelLongHistogram(
meter.registerStorage(
builder.descriptor(),
descriptor -> newHistogramStorage(descriptor, bucketBoundaries)));
builder, descriptor -> newHistogramStorage(descriptor, bucketBoundaries)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import static datadog.opentelemetry.shim.metrics.OtelInstrumentBuilder.ofLongs;
import static datadog.opentelemetry.shim.metrics.OtelInstrumentType.UP_DOWN_COUNTER;
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_INSTRUMENT_NAME;
import static datadog.opentelemetry.shim.metrics.OtelMeter.NOOP_METER;

import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -66,20 +64,20 @@ public DoubleUpDownCounterBuilder ofDoubles() {
@Override
public LongUpDownCounter build() {
return new OtelLongUpDownCounter(
meter.registerStorage(builder.descriptor(), OtelMetricStorage::newLongSumStorage));
meter.registerStorage(builder, OtelMetricStorage::newLongSumStorage));
}

@Override
public ObservableLongUpDownCounter buildWithCallback(
Consumer<ObservableLongMeasurement> callback) {
// FIXME: implement callback
return NOOP_METER.upDownCounterBuilder(NOOP_INSTRUMENT_NAME).buildWithCallback(callback);
public ObservableLongMeasurement buildObserver() {
return meter.registerObservableStorage(builder, OtelMetricStorage::newLongSumStorage);
}

@Override
public ObservableLongMeasurement buildObserver() {
// FIXME: implement observer
return NOOP_METER.upDownCounterBuilder(NOOP_INSTRUMENT_NAME).buildObserver();
public ObservableLongUpDownCounter buildWithCallback(
Consumer<ObservableLongMeasurement> callback) {
ObservableLongMeasurement measurement = buildObserver();
Runnable runnable = () -> callback.accept(measurement);
return meter.registerObservableCallback(runnable, (OtelObservableMeasurement) measurement);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package datadog.opentelemetry.shim.metrics;

import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Stream.concat;

import datadog.opentelemetry.shim.OtelInstrumentationScope;
import datadog.opentelemetry.shim.metrics.data.OtelMetricStorage;
import datadog.opentelemetry.shim.metrics.export.OtelMeterVisitor;
Expand All @@ -11,10 +15,13 @@
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.api.metrics.ObservableMeasurement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.slf4j.Logger;
Expand All @@ -35,6 +42,8 @@ final class OtelMeter implements Meter {
private final Map<OtelInstrumentDescriptor, OtelMetricStorage> storage =
new ConcurrentHashMap<>();

private final List<OtelObservableCallback> observables = new ArrayList<>();

OtelMeter(OtelInstrumentationScope instrumentationScope) {
this.instrumentationScope = instrumentationScope;
}
Expand Down Expand Up @@ -76,8 +85,12 @@ public BatchCallback batchCallback(
Runnable callback,
ObservableMeasurement observableMeasurement,
ObservableMeasurement... additionalMeasurements) {
// FIXME: implement callback
return NOOP_METER.batchCallback(callback, observableMeasurement, additionalMeasurements);
return registerObservableCallback(
callback,
concat(Stream.of(observableMeasurement), Stream.of(additionalMeasurements))
.filter(OtelObservableMeasurement.class::isInstance)
.map(OtelObservableMeasurement.class::cast)
.collect(toList()));
}

@Override
Expand All @@ -86,12 +99,44 @@ public String toString() {
}

OtelMetricStorage registerStorage(
OtelInstrumentDescriptor descriptor,
OtelInstrumentBuilder builder,
Function<OtelInstrumentDescriptor, OtelMetricStorage> storageFactory) {
return storage.computeIfAbsent(descriptor, storageFactory);
return storage.computeIfAbsent(builder.descriptor(), storageFactory);
}

OtelObservableMeasurement registerObservableStorage(
OtelInstrumentBuilder builder,
Function<OtelInstrumentDescriptor, OtelMetricStorage> storageFactory) {
return new OtelObservableMeasurement(
storage.computeIfAbsent(builder.observableDescriptor(), storageFactory));
}

OtelObservableCallback registerObservableCallback(
Runnable callback, OtelObservableMeasurement measurement) {
return registerObservableCallback(callback, singletonList(measurement));
}

OtelObservableCallback registerObservableCallback(
Runnable callback, List<OtelObservableMeasurement> measurements) {
OtelObservableCallback observable = new OtelObservableCallback(this, callback, measurements);
synchronized (observables) {
observables.add(observable);
}
return observable;
}

boolean unregisterObservableCallback(OtelObservableCallback observable) {
synchronized (observables) {
return observables.remove(observable);
}
}

void collect(OtelMeterVisitor visitor) {
List<OtelObservableCallback> observablesCopy;
synchronized (observables) {
observablesCopy = new ArrayList<>(observables);
}
observablesCopy.forEach(OtelObservableCallback::observeMeasurements);
storage.forEach((descriptor, storage) -> storage.collect(visitor.visitInstrument(descriptor)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package datadog.opentelemetry.shim.metrics;

import datadog.opentelemetry.shim.OtelInstrumentationScope;
import datadog.opentelemetry.shim.metrics.export.OtelMetricVisitor;
import datadog.opentelemetry.shim.metrics.export.OtelMetricsVisitor;
import datadog.trace.util.Strings;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterBuilder;
Expand Down Expand Up @@ -33,7 +33,7 @@ public MeterBuilder meterBuilder(String instrumentationScopeName) {
return new OtelMeterBuilder(this, instrumentationScopeName);
}

public void collect(OtelMetricVisitor visitor) {
public void collectMetrics(OtelMetricsVisitor visitor) {
meters.forEach((scope, meter) -> meter.collect(visitor.visitMeter(scope)));
}

Expand Down
Loading