Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import javax.annotation.Nullable;

/**
* The entrypoint for OpenTelemetry metrics functionality in gRPC.
Expand Down Expand Up @@ -97,7 +99,8 @@ private GrpcOpenTelemetry(Builder builder) {
this.resource = createMetricInstruments(meter, enableMetrics, disableDefault);
this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels);
this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule(
STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins);
STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins,
builder.targetAttributeFilter);
this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk);
this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels);
}
Expand Down Expand Up @@ -141,6 +144,11 @@ Tracer getTracer() {
return this.openTelemetryTracingModule.getTracer();
}

@VisibleForTesting
Predicate<String> getTargetAttributeFilter() {
return this.openTelemetryMetricsModule.getTargetAttributeFilter();
}

/**
* Registers GrpcOpenTelemetry globally, applying its configuration to all subsequently created
* gRPC channels and servers.
Expand Down Expand Up @@ -359,6 +367,8 @@ public static class Builder {
private final Collection<String> optionalLabels = new ArrayList<>();
private final Map<String, Boolean> enableMetrics = new HashMap<>();
private boolean disableAll;
@Nullable
private Predicate<String> targetAttributeFilter;

private Builder() {}

Expand Down Expand Up @@ -421,6 +431,19 @@ Builder enableTracing(boolean enable) {
return this;
}

/**
* Sets an optional filter to control recording of the {@code grpc.target} metric attribute.
*
* <p>If the predicate returns {@code true}, the original target is recorded. Otherwise,
* the target is recorded as {@code "other"} to limit metric cardinality.
*
* <p>If unset, all targets are recorded as-is.
*/
public Builder targetAttributeFilter(@Nullable Predicate<String> filter) {
this.targetAttributeFilter = filter;
return this;
}

/**
* Returns a new {@link GrpcOpenTelemetry} built with the configuration of this {@link
* Builder}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -68,6 +69,10 @@
* tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats
* of the overall RPC, such as RETRIES_PER_CALL, to OpenTelemetry.
*
* <p>This module optionally applies a target attribute filter to limit the cardinality of
* the {@code grpc.target} attribute in client-side metrics by mapping disallowed targets
* to a stable placeholder value.
*
* <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
* starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call, and
* it's the tracer that reports the summary to OpenTelemetry.
Expand Down Expand Up @@ -95,15 +100,30 @@ final class OpenTelemetryMetricsModule {
private final boolean localityEnabled;
private final boolean backendServiceEnabled;
private final ImmutableList<OpenTelemetryPlugin> plugins;
@Nullable
private final Predicate<String> targetAttributeFilter;

OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource,
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) {
this(stopwatchSupplier, resource, optionalLabels, plugins, null);
}

OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource,
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
@Nullable Predicate<String> targetAttributeFilter) {
this.resource = checkNotNull(resource, "resource");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
this.plugins = ImmutableList.copyOf(plugins);
this.targetAttributeFilter = targetAttributeFilter;
}

@VisibleForTesting
Predicate<String> getTargetAttributeFilter() {
return targetAttributeFilter;
}

/**
Expand All @@ -124,7 +144,15 @@ ClientInterceptor getClientInterceptor(String target) {
pluginBuilder.add(plugin);
}
}
return new MetricsClientInterceptor(target, pluginBuilder.build());
String filteredTarget = recordTarget(target);
return new MetricsClientInterceptor(filteredTarget, pluginBuilder.build());
}

String recordTarget(String target) {
if (targetAttributeFilter == null) {
return target;
}
return targetAttributeFilter.test(target) ? target : "other";
}

static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import java.util.Arrays;
import java.util.function.Predicate;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -130,6 +131,17 @@ public void builderDefaults() {
);
}

@Test
public void builderTargetAttributeFilter() {
Predicate<String> filter = t -> t.contains("allowed.com");
GrpcOpenTelemetry module = GrpcOpenTelemetry.newBuilder()
.targetAttributeFilter(filter)
.build();

assertThat(module.getTargetAttributeFilter())
.isSameInstanceAs(filter);
}

@Test
public void enableDisableMetrics() {
GrpcOpenTelemetry.Builder builder = GrpcOpenTelemetry.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -1667,12 +1668,149 @@ public void serverBaggagePropagationToMetrics() {
assertEquals("67", capturedBaggage.getEntryValue("user-id"));
}

@Test
public void targetAttributeFilter_notSet_usesOriginalTarget() {
// Test that when no filter is set, the original target is used
String target = "dns:///example.com";
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource);

Channel interceptedChannel =
ClientInterceptors.intercept(
grpcServerRule.getChannel(), module.getClientInterceptor(target));

ClientCall<String, String> call = interceptedChannel.newCall(method, CALL_OPTIONS);

// Make the call
Metadata headers = new Metadata();
call.start(mockClientCallListener, headers);

// End the call
call.halfClose();
call.request(1);

io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName());

assertThat(openTelemetryTesting.getMetrics())
.anySatisfy(
metric ->
assertThat(metric)
.hasInstrumentationScope(InstrumentationScopeInfo.create(
OpenTelemetryConstants.INSTRUMENTATION_SCOPE))
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
.hasUnit("{attempt}")
.hasLongSumSatisfying(
longSum ->
longSum
.hasPointsSatisfying(
point ->
point
.hasAttributes(attributes))));
}

@Test
public void targetAttributeFilter_allowsTarget_usesOriginalTarget() {
// Test that when filter allows the target, the original target is used
String target = "dns:///example.com";
Predicate<String> targetFilter = t -> t.contains("example.com");
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource, targetFilter);

Channel interceptedChannel =
ClientInterceptors.intercept(
grpcServerRule.getChannel(), module.getClientInterceptor(target));

ClientCall<String, String> call = interceptedChannel.newCall(method, CALL_OPTIONS);

// Make the call
Metadata headers = new Metadata();
call.start(mockClientCallListener, headers);

// End the call
call.halfClose();
call.request(1);

io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, target,
METHOD_KEY, method.getFullMethodName());

assertThat(openTelemetryTesting.getMetrics())
.anySatisfy(
metric ->
assertThat(metric)
.hasInstrumentationScope(InstrumentationScopeInfo.create(
OpenTelemetryConstants.INSTRUMENTATION_SCOPE))
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
.hasUnit("{attempt}")
.hasLongSumSatisfying(
longSum ->
longSum
.hasPointsSatisfying(
point ->
point
.hasAttributes(attributes))));
}

@Test
public void targetAttributeFilter_rejectsTarget_mapsToOther() {
// Test that when filter rejects the target, it is mapped to "other"
String target = "dns:///example.com";
Predicate<String> targetFilter = t -> t.contains("allowed.com");
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
enabledMetricsMap, disableDefaultMetrics);
OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource, targetFilter);

Channel interceptedChannel =
ClientInterceptors.intercept(
grpcServerRule.getChannel(), module.getClientInterceptor(target));

ClientCall<String, String> call = interceptedChannel.newCall(method, CALL_OPTIONS);

// Make the call
Metadata headers = new Metadata();
call.start(mockClientCallListener, headers);

// End the call
call.halfClose();
call.request(1);

io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
TARGET_KEY, "other",
METHOD_KEY, method.getFullMethodName());

assertThat(openTelemetryTesting.getMetrics())
.anySatisfy(
metric ->
assertThat(metric)
.hasInstrumentationScope(InstrumentationScopeInfo.create(
OpenTelemetryConstants.INSTRUMENTATION_SCOPE))
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
.hasUnit("{attempt}")
.hasLongSumSatisfying(
longSum ->
longSum
.hasPointsSatisfying(
point ->
point
.hasAttributes(attributes))));
}

private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule(
OpenTelemetryMetricsResource resource) {
return new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList());
}

private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule(
OpenTelemetryMetricsResource resource, Predicate<String> filter) {
return new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), filter);
}

static class CallInfo<ReqT, RespT> extends ServerCallInfo<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> methodDescriptor;
private final Attributes attributes;
Expand Down