From 248b6bee9f21c1ab8a2095b37a6e280c551ee745 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 5 Dec 2025 11:43:09 -0800 Subject: [PATCH 1/3] Add custom label for per-RPC metrics TODO: Add tests --- api/src/main/java/io/grpc/Grpc.java | 7 ++++ .../OpenTelemetryMetricsModule.java | 35 ++++++++++++------- .../internal/OpenTelemetryConstants.java | 3 ++ .../OpenTelemetryMetricsModuleTest.java | 20 +++++------ .../java/io/grpc/rls/CachingRlsLbClient.java | 20 +++++++---- .../java/io/grpc/rls/RlsLoadBalancerTest.java | 6 ++-- 6 files changed, 60 insertions(+), 31 deletions(-) diff --git a/api/src/main/java/io/grpc/Grpc.java b/api/src/main/java/io/grpc/Grpc.java index baa9f5f0ab6..a45c613fd18 100644 --- a/api/src/main/java/io/grpc/Grpc.java +++ b/api/src/main/java/io/grpc/Grpc.java @@ -56,6 +56,13 @@ private Grpc() { public static final Attributes.Key TRANSPORT_ATTR_SSL_SESSION = Attributes.Key.create("io.grpc.Grpc.TRANSPORT_ATTR_SSL_SESSION"); + /** + * The value for the custom label of per-RPC metrics. Defaults to empty string when unset. Must + * not be set to {@code null}. + */ + public static final CallOptions.Key CALL_OPTION_CUSTOM_LABEL = + CallOptions.Key.createWithDefault("io.grpc.Grpc.CALL_OPTION_CUSTOM_LABEL", ""); + /** * Annotation for transport attributes. It follows the annotation semantics defined * by {@link Attributes}. diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index 3e5137e0034..eeac0f12835 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.CUSTOM_LABEL_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY; @@ -39,6 +40,7 @@ import io.grpc.Deadline; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerStreamTracer; @@ -94,6 +96,7 @@ final class OpenTelemetryMetricsModule { private final Supplier stopwatchSupplier; private final boolean localityEnabled; private final boolean backendServiceEnabled; + private final boolean customLabelEnabled; private final ImmutableList plugins; OpenTelemetryMetricsModule(Supplier stopwatchSupplier, @@ -103,6 +106,7 @@ final class OpenTelemetryMetricsModule { this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey()); this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey()); + this.customLabelEnabled = optionalLabels.contains(CUSTOM_LABEL_KEY.getKey()); this.plugins = ImmutableList.copyOf(plugins); } @@ -249,7 +253,7 @@ public void streamClosed(Status status) { statusCode = Code.DEADLINE_EXCEEDED; } } - attemptsState.attemptEnded(); + attemptsState.attemptEnded(info.getCallOptions()); recordFinishedAttempt(); } @@ -273,6 +277,10 @@ void recordFinishedAttempt() { } builder.put(BACKEND_SERVICE_KEY, savedBackendService); } + if (module.customLabelEnabled) { + builder.put( + CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL)); + } for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) { plugin.addLabels(builder); } @@ -383,7 +391,7 @@ private ClientTracer newClientTracer(StreamInfo info) { } // Called whenever each attempt is ended. - void attemptEnded() { + void attemptEnded(CallOptions callOptions) { boolean shouldRecordFinishedCall = false; synchronized (lock) { if (--activeStreams == 0) { @@ -395,11 +403,11 @@ void attemptEnded() { } } if (shouldRecordFinishedCall) { - recordFinishedCall(); + recordFinishedCall(callOptions); } } - void callEnded(Status status) { + void callEnded(Status status, CallOptions callOptions) { callStopWatch.stop(); this.status = status; boolean shouldRecordFinishedCall = false; @@ -415,11 +423,11 @@ void callEnded(Status status) { } } if (shouldRecordFinishedCall) { - recordFinishedCall(); + recordFinishedCall(callOptions); } } - void recordFinishedCall() { + void recordFinishedCall(CallOptions callOptions) { Context otelContext = otelContextWithBaggage(); if (attemptsPerCall.get() == 0) { ClientTracer tracer = newClientTracer(null); @@ -430,11 +438,13 @@ void recordFinishedCall() { callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS); // Base attributes - io.opentelemetry.api.common.Attributes baseAttributes = - io.opentelemetry.api.common.Attributes.of( - METHOD_KEY, fullMethodName, - TARGET_KEY, target - ); + AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, fullMethodName) + .put(TARGET_KEY, target); + if (module.customLabelEnabled) { + builder.put(CUSTOM_LABEL_KEY, callOptions.getOption(Grpc.CALL_OPTION_CUSTOM_LABEL)); + } + io.opentelemetry.api.common.Attributes baseAttributes = builder.build(); // Duration if (module.resource.clientCallDurationCounter() != null) { @@ -660,6 +670,7 @@ public ClientCall interceptCall( callOptions = plugin.filterCallOptions(callOptions); } } + final CallOptions finalCallOptions = callOptions; // Only record method name as an attribute if isSampledToLocalTracing is set to true, // which is true for all generated methods. Otherwise, programatically // created methods result in high cardinality metrics. @@ -679,7 +690,7 @@ public void start(Listener responseListener, Metadata headers) { new SimpleForwardingClientCallListener(responseListener) { @Override public void onClose(Status status, Metadata trailers) { - tracerFactory.callEnded(status); + tracerFactory.callEnded(status, finalCallOptions); super.onClose(status, trailers); } }, diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java index 2c7123198c4..c09a1a2beca 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java @@ -38,6 +38,9 @@ public final class OpenTelemetryConstants { public static final AttributeKey BACKEND_SERVICE_KEY = AttributeKey.stringKey("grpc.lb.backend_service"); + public static final AttributeKey CUSTOM_LABEL_KEY = + AttributeKey.stringKey("grpc.client.call.custom"); + public static final AttributeKey DISCONNECT_ERROR_KEY = AttributeKey.stringKey("grpc.disconnect_error"); diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 58759294fca..3fdeaf08fbb 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -322,7 +322,7 @@ public void clientBasicMetrics() { tracer.inboundMessage(1); tracer.inboundWireSize(154); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes clientAttributes = io.opentelemetry.api.common.Attributes.of( @@ -453,7 +453,7 @@ public void clientBasicMetrics_withRetryMetricsEnabled_shouldRecordZeroOrBeAbsen fakeClock.forwardTime(100, TimeUnit.MILLISECONDS); tracer.outboundMessage(0); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes finalAttributes = io.opentelemetry.api.common.Attributes.of( @@ -827,7 +827,7 @@ public void recordAttemptMetrics() { fakeClock.forwardTime(24, MILLISECONDS); // RPC succeeded tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes clientAttributes2 = io.opentelemetry.api.common.Attributes.of( @@ -995,7 +995,7 @@ public void recordAttemptMetrics_withRetryMetricsEnabled() { tracer.streamClosed(Status.OK); // RPC succeeded // --- The overall call ends --- - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); // Define attributes for assertions io.opentelemetry.api.common.Attributes finalAttributes @@ -1087,7 +1087,7 @@ public void recordAttemptMetrics_withHedgedCalls() { hedgeTracer2.streamClosed(Status.OK); // Second hedge succeeds // --- The overall call ends --- - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); // Define attributes for assertions io.opentelemetry.api.common.Attributes finalAttributes @@ -1141,7 +1141,7 @@ public void clientStreamNeverCreatedStillRecordMetrics() { method.getFullMethodName(), emptyList()); fakeClock.forwardTime(3000, MILLISECONDS); Status status = Status.DEADLINE_EXCEEDED.withDescription("5 seconds"); - callAttemptsTracerFactory.callEnded(status); + callAttemptsTracerFactory.callEnded(status, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes attemptStartedAttributes = io.opentelemetry.api.common.Attributes.of( @@ -1255,7 +1255,7 @@ public void clientLocalityMetrics_present() { tracer.addOptionalLabel("grpc.lb.locality", "the-moon"); tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon"); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( TARGET_KEY, target, @@ -1319,7 +1319,7 @@ public void clientLocalityMetrics_missing() { ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( TARGET_KEY, target, @@ -1388,7 +1388,7 @@ public void clientBackendServiceMetrics_present() { tracer.addOptionalLabel("grpc.lb.backend_service", "the-moon"); tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon"); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( TARGET_KEY, target, @@ -1453,7 +1453,7 @@ public void clientBackendServiceMetrics_missing() { ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( TARGET_KEY, target, diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index 83e9d482bc5..a2846fd04c8 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -33,6 +33,7 @@ import io.grpc.ChannelLogger; import io.grpc.ChannelLogger.ChannelLogLevel; import io.grpc.ConnectivityState; +import io.grpc.Grpc; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; @@ -141,20 +142,22 @@ final class CachingRlsLbClient { "grpc.lb.rls.default_target_picks", "EXPERIMENTAL. Number of LB picks sent to the default target", "{pick}", Arrays.asList("grpc.target", "grpc.lb.rls.server_target", - "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(), + "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), + Arrays.asList("grpc.client.call.custom"), false); TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks", "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default " + "target is also returned by the RLS server, RPCs sent to that target from the cache " + "will be counted in this metric, not in grpc.rls.default_target_picks.", "{pick}", Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", - "grpc.lb.pick_result"), Collections.emptyList(), + "grpc.lb.pick_result"), + Arrays.asList("grpc.client.call.custom"), false); FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks", "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the " + "RLS channel being throttled", "{pick}", Arrays.asList("grpc.target", "grpc.lb.rls.server_target"), - Collections.emptyList(), false); + Arrays.asList("grpc.client.call.custom"), false); CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries", "EXPERIMENTAL. Number of entries in the RLS cache", "{entry}", Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"), @@ -1033,7 +1036,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1, Arrays.asList(helper.getChannelTarget(), lookupService, childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)), - Collections.emptyList()); + Arrays.asList(determineCustomLabel(args))); } return pickResult; } else if (response.hasError()) { @@ -1041,7 +1044,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { return useFallback(args); } helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1, - Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList()); + Arrays.asList(helper.getChannelTarget(), lookupService), + Arrays.asList(determineCustomLabel(args))); return PickResult.withError( convertRlsServerStatus(response.getStatus(), lbPolicyConfig.getRouteLookupConfig().lookupService())); @@ -1061,7 +1065,7 @@ private PickResult useFallback(PickSubchannelArgs args) { helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1, Arrays.asList(helper.getChannelTarget(), lookupService, fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)), - Collections.emptyList()); + Arrays.asList(determineCustomLabel(args))); } return pickResult; } @@ -1076,6 +1080,10 @@ private String determineMetricsPickResult(PickResult pickResult) { } } + private String determineCustomLabel(PickSubchannelArgs args) { + return args.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL); + } + // GuardedBy CachingRlsLbClient.lock void close() { synchronized (lock) { // Lock is already held, but ErrorProne can't tell diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index 8d16d1bd74c..740c330e1e3 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -382,7 +382,7 @@ public void metricsWithRealChannel() throws Exception { eq(1L), eq(Arrays.asList("directaddress:///fake-bigtable.googleapis.com", "localhost:8972", "defaultTarget", "complete")), - eq(Arrays.asList())); + eq(Arrays.asList(""))); } @Test @@ -687,7 +687,7 @@ private void verifyLongCounterAdd(String name, int times, long value, verify(mockMetricRecorder, times(times)).addLongCounter( eqMetricInstrumentName(name), eq(value), eq(Lists.newArrayList(channelTarget, "localhost:8972", dataPlaneTargetLabel, pickResult)), - eq(Lists.newArrayList())); + eq(Lists.newArrayList(""))); } // This one is for verifying the failed_pick metric specifically. @@ -696,7 +696,7 @@ private void verifyFailedPicksCounterAdd(int times, long value) { verify(mockMetricRecorder, times(times)).addLongCounter( eqMetricInstrumentName("grpc.lb.rls.failed_picks"), eq(value), eq(Lists.newArrayList(channelTarget, "localhost:8972")), - eq(Lists.newArrayList())); + eq(Lists.newArrayList(""))); } @SuppressWarnings("TypeParameterUnusedInFormals") From 451b38ca59e3dfa4d3dd4be99758c95818847375 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Mon, 22 Dec 2025 16:59:17 -0800 Subject: [PATCH 2/3] Add tests, and fix a case The existing tests really liked to use inconsistent CallOptions in the same test. --- .../OpenTelemetryMetricsModule.java | 25 ++- .../OpenTelemetryMetricsModuleTest.java | 154 +++++++++++++++--- 2 files changed, 148 insertions(+), 31 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index eeac0f12835..8f1b6169088 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -326,6 +326,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory CallAttemptsTracerFactory( OpenTelemetryMetricsModule module, String target, + CallOptions callOptions, String fullMethodName, List callPlugins) { this.module = checkNotNull(module, "module"); @@ -335,9 +336,14 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory this.attemptDelayStopwatch = module.stopwatchSupplier.get(); this.callStopWatch = module.stopwatchSupplier.get().start(); - io.opentelemetry.api.common.Attributes attribute = io.opentelemetry.api.common.Attributes.of( - METHOD_KEY, fullMethodName, - TARGET_KEY, target); + AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, fullMethodName) + .put(TARGET_KEY, target); + if (module.customLabelEnabled) { + builder.put( + CUSTOM_LABEL_KEY, callOptions.getOption(Grpc.CALL_OPTION_CUSTOM_LABEL)); + } + io.opentelemetry.api.common.Attributes attribute = builder.build(); // Record here in case mewClientStreamTracer() would never be called. if (module.resource.clientAttemptCountCounter() != null) { @@ -361,9 +367,14 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada // CallAttemptsTracerFactory constructor. attemptsPerCall will be non-zero after the first // attempt, as first attempt cannot be a transparent retry. if (attemptsPerCall.get() > 0) { - io.opentelemetry.api.common.Attributes attribute = - io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName, - TARGET_KEY, target); + AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, fullMethodName) + .put(TARGET_KEY, target); + if (module.customLabelEnabled) { + builder.put( + CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL)); + } + io.opentelemetry.api.common.Attributes attribute = builder.build(); if (module.resource.clientAttemptCountCounter() != null) { module.resource.clientAttemptCountCounter().add(1, attribute); } @@ -675,7 +686,7 @@ public ClientCall interceptCall( // which is true for all generated methods. Otherwise, programatically // created methods result in high cardinality metrics. final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory( - OpenTelemetryMetricsModule.this, target, + OpenTelemetryMetricsModule.this, target, callOptions, recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()), callPlugins); ClientCall call = diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 3fdeaf08fbb..dd8fa769fb7 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -38,6 +38,7 @@ import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; import io.grpc.ClientStreamTracer; +import io.grpc.Grpc; import io.grpc.KnownLength; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -97,10 +98,9 @@ public class OpenTelemetryMetricsModuleTest { private static final CallOptions.Key CUSTOM_OPTION = CallOptions.Key.createWithDefault("option1", "default"); private static final CallOptions CALL_OPTIONS = - CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue"); + CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L); private static final ClientStreamTracer.StreamInfo STREAM_INFO = - ClientStreamTracer.StreamInfo.newBuilder() - .setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L)).build(); + ClientStreamTracer.StreamInfo.newBuilder().setCallOptions(CALL_OPTIONS).build(); private static final String CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME = "grpc.client.attempt.started"; private static final String CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME = "grpc.client.attempt.duration"; @@ -248,7 +248,8 @@ public ClientCall interceptCall( grpcServerRule.getChannel(), callOptionsCaptureInterceptor, module.getClientInterceptor("target:///")); ClientCall call; - call = interceptedChannel.newCall(method, CALL_OPTIONS); + call = interceptedChannel.newCall( + method, CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue")); assertEquals("customvalue", capturedCallOptions.get().getOption(CUSTOM_OPTION)); assertEquals(1, capturedCallOptions.get().getStreamTracerFactories().size()); @@ -277,7 +278,8 @@ public void clientBasicMetrics() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + new CallAttemptsTracerFactory( + module, target, CALL_OPTIONS, method.getFullMethodName(), emptyList()); Metadata headers = new Metadata(); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, headers); @@ -322,7 +324,7 @@ public void clientBasicMetrics() { tracer.inboundMessage(1); tracer.inboundWireSize(154); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); + callAttemptsTracerFactory.callEnded(Status.OK, CALL_OPTIONS); io.opentelemetry.api.common.Attributes clientAttributes = io.opentelemetry.api.common.Attributes.of( @@ -444,7 +446,8 @@ public void clientBasicMetrics_withRetryMetricsEnabled_shouldRecordZeroOrBeAbsen enabledMetrics, disableDefaultMetrics); OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + new CallAttemptsTracerFactory( + module, target, CALL_OPTIONS, method.getFullMethodName(), emptyList()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); @@ -453,7 +456,7 @@ public void clientBasicMetrics_withRetryMetricsEnabled_shouldRecordZeroOrBeAbsen fakeClock.forwardTime(100, TimeUnit.MILLISECONDS); tracer.outboundMessage(0); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); + callAttemptsTracerFactory.callEnded(Status.OK, CALL_OPTIONS); io.opentelemetry.api.common.Attributes finalAttributes = io.opentelemetry.api.common.Attributes.of( @@ -511,7 +514,7 @@ public void recordAttemptMetrics() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, + new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, CALL_OPTIONS, method.getFullMethodName(), emptyList()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); @@ -827,7 +830,7 @@ public void recordAttemptMetrics() { fakeClock.forwardTime(24, MILLISECONDS); // RPC succeeded tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); + callAttemptsTracerFactory.callEnded(Status.OK, CALL_OPTIONS); io.opentelemetry.api.common.Attributes clientAttributes2 = io.opentelemetry.api.common.Attributes.of( @@ -968,7 +971,7 @@ public void recordAttemptMetrics_withRetryMetricsEnabled() { enabledMetrics, disableDefaultMetrics); OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, + new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, CALL_OPTIONS, method.getFullMethodName(), emptyList()); ClientStreamTracer tracer = @@ -995,7 +998,7 @@ public void recordAttemptMetrics_withRetryMetricsEnabled() { tracer.streamClosed(Status.OK); // RPC succeeded // --- The overall call ends --- - callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); + callAttemptsTracerFactory.callEnded(Status.OK, CALL_OPTIONS); // Define attributes for assertions io.opentelemetry.api.common.Attributes finalAttributes @@ -1056,7 +1059,7 @@ public void recordAttemptMetrics_withHedgedCalls() { enabledMetrics, disableDefaultMetrics); OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, + new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, CALL_OPTIONS, method.getFullMethodName(), emptyList()); // Create a StreamInfo specifically for hedged attempts @@ -1087,7 +1090,7 @@ public void recordAttemptMetrics_withHedgedCalls() { hedgeTracer2.streamClosed(Status.OK); // Second hedge succeeds // --- The overall call ends --- - callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); + callAttemptsTracerFactory.callEnded(Status.OK, CALL_OPTIONS); // Define attributes for assertions io.opentelemetry.api.common.Attributes finalAttributes @@ -1137,11 +1140,11 @@ public void clientStreamNeverCreatedStillRecordMetrics() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, + new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, CALL_OPTIONS, method.getFullMethodName(), emptyList()); fakeClock.forwardTime(3000, MILLISECONDS); Status status = Status.DEADLINE_EXCEEDED.withDescription("5 seconds"); - callAttemptsTracerFactory.callEnded(status, CallOptions.DEFAULT); + callAttemptsTracerFactory.callEnded(status, CALL_OPTIONS); io.opentelemetry.api.common.Attributes attemptStartedAttributes = io.opentelemetry.api.common.Attributes.of( @@ -1246,7 +1249,8 @@ public void clientLocalityMetrics_present() { OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + new CallAttemptsTracerFactory( + module, target, CALL_OPTIONS, method.getFullMethodName(), emptyList()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); @@ -1255,7 +1259,7 @@ public void clientLocalityMetrics_present() { tracer.addOptionalLabel("grpc.lb.locality", "the-moon"); tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon"); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); + callAttemptsTracerFactory.callEnded(Status.OK, CALL_OPTIONS); io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( TARGET_KEY, target, @@ -1314,12 +1318,13 @@ public void clientLocalityMetrics_missing() { OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + new CallAttemptsTracerFactory( + module, target, CALL_OPTIONS, method.getFullMethodName(), emptyList()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); + callAttemptsTracerFactory.callEnded(Status.OK, CALL_OPTIONS); io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( TARGET_KEY, target, @@ -1379,7 +1384,8 @@ public void clientBackendServiceMetrics_present() { fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + new CallAttemptsTracerFactory( + module, target, CALL_OPTIONS, method.getFullMethodName(), emptyList()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); @@ -1388,7 +1394,7 @@ public void clientBackendServiceMetrics_present() { tracer.addOptionalLabel("grpc.lb.backend_service", "the-moon"); tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon"); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); + callAttemptsTracerFactory.callEnded(Status.OK, CALL_OPTIONS); io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( TARGET_KEY, target, @@ -1448,12 +1454,13 @@ public void clientBackendServiceMetrics_missing() { fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + new CallAttemptsTracerFactory( + module, target, CALL_OPTIONS, method.getFullMethodName(), emptyList()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); tracer.streamClosed(Status.OK); - callAttemptsTracerFactory.callEnded(Status.OK, CallOptions.DEFAULT); + callAttemptsTracerFactory.callEnded(Status.OK, CALL_OPTIONS); io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of( TARGET_KEY, target, @@ -1504,6 +1511,100 @@ public void clientBackendServiceMetrics_missing() { point -> point.hasAttributes(clientAttributes)))); } + @Test + public void customLabel_present() { + Map enabledMetrics = ImmutableMap.of( + CLIENT_CALL_HEDGES, true, + CLIENT_CALL_RETRIES, true, + CLIENT_CALL_RETRY_DELAY, true, + CLIENT_CALL_TRANSPARENT_RETRIES, true + ); + String target = "target:///"; + String customValue = "some-random-value"; + CallOptions callOptions = + STREAM_INFO.getCallOptions().withOption(Grpc.CALL_OPTION_CUSTOM_LABEL, customValue); + OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, + enabledMetrics, disableDefaultMetrics); + String customLabel = "grpc.client.call.custom"; + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, Arrays.asList(customLabel), + emptyList()); + OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = + new CallAttemptsTracerFactory( + module, target, callOptions, method.getFullMethodName(), emptyList()); + + ClientStreamTracer.StreamInfo streamInfo = + STREAM_INFO.toBuilder().setCallOptions(callOptions).build(); + ClientStreamTracer tracer = + callAttemptsTracerFactory.newClientStreamTracer(streamInfo, new Metadata()); + tracer.streamClosed(Status.UNAVAILABLE); + + tracer = callAttemptsTracerFactory.newClientStreamTracer(streamInfo, new Metadata()); + tracer.streamClosed(Status.UNAVAILABLE); + + tracer = callAttemptsTracerFactory.newClientStreamTracer( + streamInfo.toBuilder().setIsTransparentRetry(true).build(), new Metadata()); + tracer.streamClosed(Status.UNAVAILABLE); + + tracer = callAttemptsTracerFactory.newClientStreamTracer( + streamInfo.toBuilder().setIsHedging(true).build(), new Metadata()); + tracer.streamClosed(Status.OK); + callAttemptsTracerFactory.callEnded(Status.OK, callOptions); + + AttributeKey attributeKey = AttributeKey.stringKey(customLabel); + + assertThat(sortByName(openTelemetryTesting.getMetrics())) + .satisfiesExactly( + metric -> assertThat(metric) + .hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttribute(attributeKey, customValue), + point -> point.hasAttribute(attributeKey, customValue))), + metric -> assertThat(metric) + .hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttribute(attributeKey, customValue), + point -> point.hasAttribute(attributeKey, customValue))), + metric -> assertThat(metric) + .hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttribute(attributeKey, customValue), + point -> point.hasAttribute(attributeKey, customValue))), + metric -> assertThat(metric) + .hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME) + .hasLongSumSatisfying( + longSum -> longSum.hasPointsSatisfying( + point -> point.hasAttribute(attributeKey, customValue))), + metric -> assertThat(metric) + .hasName(CLIENT_CALL_DURATION) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttribute(attributeKey, customValue))), + metric -> assertThat(metric) + .hasName(CLIENT_CALL_HEDGES) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttribute(attributeKey, customValue))), + metric -> assertThat(metric) + .hasName(CLIENT_CALL_RETRIES) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttribute(attributeKey, customValue))), + metric -> assertThat(metric) + .hasName(CLIENT_CALL_RETRY_DELAY) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttribute(attributeKey, customValue))), + metric -> assertThat(metric) + .hasName(CLIENT_CALL_TRANSPARENT_RETRIES) + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttribute(attributeKey, customValue)))); + } + @Test public void serverBasicMetrics() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, @@ -1753,6 +1854,11 @@ public void serverBaggagePropagation_EndToEnd() throws Exception { assertEquals("red_pill_or_blue_pill", capturedBaggage.getEntryValue("choice")); } + private static List sortByName(List metrics) { + metrics.sort((m1, m2) -> m1.getName().compareTo(m2.getName())); + return metrics; + } + /** * A simple service implementation for the E2E test. */ From aa30a7fe48696f18564d6b3c8a277e4391506a0e Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Tue, 23 Dec 2025 15:27:56 -0800 Subject: [PATCH 3/3] Add a test for RLS, too --- rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index 740c330e1e3..2508c6dc155 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -42,6 +42,7 @@ import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.ForwardingChannelBuilder2; +import io.grpc.Grpc; import io.grpc.InternalManagedChannelBuilder; import io.grpc.LoadBalancer.CreateSubchannelArgs; import io.grpc.LoadBalancer.Helper; @@ -371,8 +372,10 @@ public void metricsWithRealChannel() throws Exception { .build()); StreamRecorder recorder = StreamRecorder.create(); + CallOptions callOptions = CallOptions.DEFAULT + .withOption(Grpc.CALL_OPTION_CUSTOM_LABEL, "customvalue"); StreamObserver requestObserver = ClientCalls.asyncClientStreamingCall( - channel.newCall(fakeSearchMethod, CallOptions.DEFAULT), recorder); + channel.newCall(fakeSearchMethod, callOptions), recorder); requestObserver.onCompleted(); assertThat(recorder.awaitCompletion(10, TimeUnit.SECONDS)).isTrue(); assertThat(recorder.getError()).isNull(); @@ -382,7 +385,7 @@ public void metricsWithRealChannel() throws Exception { eq(1L), eq(Arrays.asList("directaddress:///fake-bigtable.googleapis.com", "localhost:8972", "defaultTarget", "complete")), - eq(Arrays.asList(""))); + eq(Arrays.asList("customvalue"))); } @Test