From d5cfb1d71f9684534b2a214c5a95b1fb1d3e07bf Mon Sep 17 00:00:00 2001 From: Emmanuel Hugonnet Date: Wed, 26 Nov 2025 14:29:18 +0100 Subject: [PATCH] feat: Add telemetry support. * Adding an annotation that can be used by CDI interceptor to create spans on current exchanges. * Adding support for client wrappers to be able to add client side spans. * Updating the helloworld example to use opentelemetry. Fixing issue #388 Signed-off-by: Emmanuel Hugonnet --- .gitignore | 4 + boms/extras/pom.xml | 5 + boms/extras/src/it/extras-usage-test/pom.xml | 4 + .../java/io/a2a/client/ClientBuilder.java | 57 ++- .../transport/rest/RestTransportConfig.java | 2 +- .../transport/spi/ClientTransportConfig.java | 11 + .../transport/spi/ClientTransportWrapper.java | 81 +++++ examples/helloworld/client/README.md | 105 +++++- examples/helloworld/client/pom.xml | 32 +- .../examples/helloworld/HelloWorldClient.java | 49 ++- .../examples/helloworld/HelloWorldRunner.java | 4 + examples/helloworld/pom.xml | 19 - examples/helloworld/server/README.md | 30 ++ examples/helloworld/server/pom.xml | 26 +- .../helloworld/AgentCardProducer.java | 2 + .../src/main/resources/application.properties | 10 +- extras/opentelemetry/pom.xml | 52 +++ .../OpenTelemetryClientTransport.java | 327 ++++++++++++++++++ .../OpenTelemetryClientTransportWrapper.java | 45 +++ .../extras/opentelemetry/SpanInterceptor.java | 93 +++++ .../extras/opentelemetry/package-info.java | 5 + .../src/main/resources/META-INF/beans.xml | 6 + ...lient.transport.spi.ClientTransportWrapper | 1 + pom.xml | 11 + .../quarkus/A2AExtensionsInterceptor.java | 23 +- .../interceptors/InvocationContext.java | 39 +++ .../java/io/a2a/server/interceptors/Kind.java | 5 + .../interceptors/NoAttributeExtractor.java | 14 + .../io/a2a/server/interceptors/Trace.java | 27 ++ .../DefaultRequestHandler.java | 12 + .../RequestHandlerAttributeExtractor.java | 39 +++ spec/src/main/java/io/a2a/spec/Message.java | 5 + spec/src/main/java/io/a2a/spec/Task.java | 5 + .../grpc/context/GrpcContextKeys.java | 56 ++- .../grpc/handler/GrpcAttributeExtractor.java | 39 +++ .../transport/grpc/handler/GrpcHandler.java | 16 +- .../jsonrpc/handler/JSONRPCHandler.java | 15 +- .../handler/JsonRPCAttributeExtractor.java | 41 +++ .../rest/handler/RestAttributeExtractor.java | 73 ++++ .../transport/rest/handler/RestHandler.java | 15 +- 40 files changed, 1334 insertions(+), 71 deletions(-) create mode 100644 client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportWrapper.java create mode 100644 extras/opentelemetry/pom.xml create mode 100644 extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransport.java create mode 100644 extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransportWrapper.java create mode 100644 extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/SpanInterceptor.java create mode 100644 extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/package-info.java create mode 100644 extras/opentelemetry/src/main/resources/META-INF/beans.xml create mode 100644 extras/opentelemetry/src/main/resources/META-INF/services/io.a2a.client.transport.spi.ClientTransportWrapper create mode 100644 server-common/src/main/java/io/a2a/server/interceptors/InvocationContext.java create mode 100644 server-common/src/main/java/io/a2a/server/interceptors/Kind.java create mode 100644 server-common/src/main/java/io/a2a/server/interceptors/NoAttributeExtractor.java create mode 100644 server-common/src/main/java/io/a2a/server/interceptors/Trace.java create mode 100644 server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandlerAttributeExtractor.java create mode 100644 transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractor.java create mode 100644 transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JsonRPCAttributeExtractor.java create mode 100644 transport/rest/src/main/java/io/a2a/transport/rest/handler/RestAttributeExtractor.java diff --git a/.gitignore b/.gitignore index dcc2f1cb8..e96770c17 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,9 @@ pom.xml.versionsBackup release.properties .flattened-pom.xml +#Claude +CLAUDE.md + # Eclipse .project .classpath @@ -20,6 +23,7 @@ bin/ # NetBeans nb-configuration.xml +nbactions.xml # Visual Studio Code .vscode diff --git a/boms/extras/pom.xml b/boms/extras/pom.xml index 78f962fe6..3a5fe1203 100644 --- a/boms/extras/pom.xml +++ b/boms/extras/pom.xml @@ -34,6 +34,11 @@ a2a-java-extras-common ${project.version} + + ${project.groupId} + a2a-java-sdk-opentelemetry + ${project.version} + ${project.groupId} a2a-java-extras-task-store-database-jpa diff --git a/boms/extras/src/it/extras-usage-test/pom.xml b/boms/extras/src/it/extras-usage-test/pom.xml index 5beeb50ea..356eccb58 100644 --- a/boms/extras/src/it/extras-usage-test/pom.xml +++ b/boms/extras/src/it/extras-usage-test/pom.xml @@ -44,6 +44,10 @@ io.github.a2asdk a2a-java-extras-common + + io.github.a2asdk + a2a-java-sdk-opentelemetry + io.github.a2asdk a2a-java-extras-task-store-database-jpa diff --git a/client/base/src/main/java/io/a2a/client/ClientBuilder.java b/client/base/src/main/java/io/a2a/client/ClientBuilder.java index 5765da204..28316990c 100644 --- a/client/base/src/main/java/io/a2a/client/ClientBuilder.java +++ b/client/base/src/main/java/io/a2a/client/ClientBuilder.java @@ -5,10 +5,15 @@ import io.a2a.client.transport.spi.ClientTransportConfig; import io.a2a.client.transport.spi.ClientTransportConfigBuilder; import io.a2a.client.transport.spi.ClientTransportProvider; +import io.a2a.client.transport.spi.ClientTransportWrapper; import io.a2a.spec.A2AClientException; import io.a2a.spec.AgentCard; import io.a2a.spec.AgentInterface; import io.a2a.spec.TransportProtocol; +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -16,15 +21,16 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; +import java.util.ServiceLoader.Provider; import java.util.function.BiConsumer; import java.util.function.Consumer; -import org.jspecify.annotations.NonNull; -import org.jspecify.annotations.Nullable; +import java.util.stream.Collectors; public class ClientBuilder { private static final Map>> transportProviderRegistry = new HashMap<>(); private static final Map, String> transportProtocolMapping = new HashMap<>(); + private static final Logger LOGGER = LoggerFactory.getLogger(ClientBuilder.class); static { ServiceLoader loader = ServiceLoader.load(ClientTransportProvider.class); @@ -37,7 +43,8 @@ public class ClientBuilder { private final AgentCard agentCard; private final List> consumers = new ArrayList<>(); - private @Nullable Consumer streamErrorHandler; + private @Nullable + Consumer streamErrorHandler; private ClientConfig clientConfig = new ClientConfig.Builder().build(); private final Map, ClientTransportConfig> clientTransports = new LinkedHashMap<>(); @@ -105,7 +112,7 @@ private ClientTransport buildClientTransport() throws A2AClientException { throw new A2AClientException("Missing required TransportConfig for " + agentInterface.transport()); } - return clientTransportProvider.create(clientTransportConfig, agentCard, agentInterface.url()); + return wrap(clientTransportProvider.create(clientTransportConfig, agentCard, agentInterface.url()), clientTransportConfig); } private Map getServerPreferredTransports() { @@ -160,10 +167,50 @@ private AgentInterface findBestClientTransport() throws A2AClientException { if (transportProtocol == null || transportUrl == null) { throw new A2AClientException("No compatible transport found"); } - if (! transportProviderRegistry.containsKey(transportProtocol)) { + if (!transportProviderRegistry.containsKey(transportProtocol)) { throw new A2AClientException("No client available for " + transportProtocol); } return new AgentInterface(transportProtocol, transportUrl); } + + /** + * Wraps the transport with all available transport wrappers discovered via ServiceLoader. + * Wrappers are applied in priority order (highest priority first). + * + * @param transport the base transport to wrap + * @param clientTransportConfig the transport configuration + * @return the wrapped transport (or original if no wrappers are available/applicable) + */ + private ClientTransport wrap(ClientTransport transport, ClientTransportConfig clientTransportConfig) { + ServiceLoader wrapperLoader = ServiceLoader.load(ClientTransportWrapper.class); + + // Collect all wrappers and sort by natural order (uses Comparable implementation) + List wrappers = wrapperLoader.stream().map(Provider::get) + .sorted() + .collect(Collectors.toList()); + + if (wrappers.isEmpty()) { + LOGGER.debug("No client transport wrappers found via ServiceLoader"); + return transport; + } + + // Apply wrappers in priority order + ClientTransport wrapped = transport; + for (ClientTransportWrapper wrapper : wrappers) { + try { + ClientTransport newWrapped = wrapper.wrap(wrapped, clientTransportConfig); + if (newWrapped != wrapped) { + LOGGER.debug("Applied transport wrapper: {} (priority: {})", + wrapper.getClass().getName(), wrapper.priority()); + } + wrapped = newWrapped; + } catch (Exception e) { + LOGGER.warn("Failed to apply transport wrapper {}: {}", + wrapper.getClass().getName(), e.getMessage(), e); + } + } + + return wrapped; + } } diff --git a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfig.java b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfig.java index d097b010f..70d12eb64 100644 --- a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfig.java +++ b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfig.java @@ -4,7 +4,7 @@ import io.a2a.client.transport.spi.ClientTransportConfig; import org.jspecify.annotations.Nullable; -public class RestTransportConfig extends ClientTransportConfig { +public class RestTransportConfig extends ClientTransportConfig { private final @Nullable A2AHttpClient httpClient; diff --git a/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportConfig.java b/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportConfig.java index 8efdb779b..ec634003a 100644 --- a/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportConfig.java +++ b/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportConfig.java @@ -2,8 +2,10 @@ import io.a2a.client.transport.spi.interceptors.ClientCallInterceptor; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Configuration for an A2A client transport. @@ -11,6 +13,7 @@ public abstract class ClientTransportConfig { protected List interceptors = new ArrayList<>(); + protected Map parameters = new HashMap<>(); public void setInterceptors(List interceptors) { this.interceptors = new ArrayList<>(interceptors); @@ -19,4 +22,12 @@ public void setInterceptors(List interceptors) { public List getInterceptors() { return interceptors; } + + public void setParameters(Map parameters) { + this.parameters = new HashMap<>(parameters); + } + + public Map getParameters() { + return parameters; + } } \ No newline at end of file diff --git a/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportWrapper.java b/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportWrapper.java new file mode 100644 index 000000000..25dba33b9 --- /dev/null +++ b/client/transport/spi/src/main/java/io/a2a/client/transport/spi/ClientTransportWrapper.java @@ -0,0 +1,81 @@ +package io.a2a.client.transport.spi; + +/** + * Service provider interface for wrapping client transports with additional functionality. + * Implementations can add cross-cutting concerns like tracing, metrics, logging, etc. + * + *

Wrappers are discovered via Java's ServiceLoader mechanism. To register a wrapper, + * create a file {@code META-INF/services/io.a2a.client.transport.spi.ClientTransportWrapper} + * containing the fully qualified class name of your implementation. + * + *

Wrappers are sorted by priority in descending order (highest priority first). + * This interface implements {@link Comparable} to enable natural sorting. + * + *

Example implementation: + *

{@code
+ * public class TracingWrapper implements ClientTransportWrapper {
+ *     @Override
+ *     public ClientTransport wrap(ClientTransport transport, ClientTransportConfig config) {
+ *         if (config.getParameters().containsKey("tracer")) {
+ *             return new TracingTransport(transport, (Tracer) config.getParameters().get("tracer"));
+ *         }
+ *         return transport;
+ *     }
+ *
+ *     @Override
+ *     public int priority() {
+ *         return 100; // Higher priority = wraps earlier (outermost)
+ *     }
+ * }
+ * }
+ */ +public interface ClientTransportWrapper extends Comparable { + + /** + * Wraps the given transport with additional functionality. + * + *

Implementations should check the configuration to determine if they should + * actually wrap the transport. If the wrapper is not applicable (e.g., required + * configuration is missing), return the original transport unchanged. + * + * @param transport the transport to wrap + * @param config the transport configuration, may contain wrapper-specific parameters + * @return the wrapped transport, or the original if wrapping is not applicable + */ + ClientTransport wrap(ClientTransport transport, ClientTransportConfig config); + + /** + * Returns the priority of this wrapper. Higher priority wrappers are applied first + * (wrap the transport earlier, resulting in being the outermost wrapper). + * + *

Default priority is 0. Suggested ranges: + *

    + *
  • 1000+ : Critical infrastructure (security, authentication) + *
  • 500-999: Observability (tracing, metrics, logging) + *
  • 100-499: Enhancement (caching, retry logic) + *
  • 0-99: Optional features + *
+ * + * @return the priority value, higher values = higher priority + */ + default int priority() { + return 0; + } + + /** + * Compares this wrapper with another based on priority. + * Returns a negative integer, zero, or a positive integer as this wrapper + * has higher priority than, equal to, or lower priority than the specified wrapper. + * + *

Note: This comparison is reversed (higher priority comes first) to enable + * natural sorting in descending priority order. + * + * @param other the wrapper to compare to + * @return negative if this has higher priority, positive if lower, zero if equal + */ + @Override + default int compareTo(ClientTransportWrapper other) { + // Reverse comparison: higher priority should come first + return Integer.compare(other.priority(), this.priority()); + } +} diff --git a/examples/helloworld/client/README.md b/examples/helloworld/client/README.md index ac01c890f..7f484607c 100644 --- a/examples/helloworld/client/README.md +++ b/examples/helloworld/client/README.md @@ -41,9 +41,9 @@ The Python A2A server is part of the [a2a-samples](https://github.com/google-a2a The server will start running on `http://localhost:9999`. -## Run the Java A2A Client with JBang +## Run the Java A2A Client -The Java client can be run using JBang, which allows you to run Java source files directly without any manual compilation. +The Java client can be run using either Maven or JBang. ### Build the A2A Java SDK @@ -54,9 +54,23 @@ cd /path/to/a2a-java mvn clean install ``` -### Using the JBang script +### Option 1: Using Maven (Recommended) -A JBang script is provided in the example directory to make running the client easy: +Run the client using Maven's exec plugin: + +```bash +cd examples/helloworld/client +mvn exec:java +``` + +To enable OpenTelemetry with Maven: +```bash +mvn exec:java -Dopentelemetry=true +``` + +### Option 2: Using JBang + +A JBang script is provided for running the client without Maven: 1. Make sure you have JBang installed. If not, follow the [JBang installation guide](https://www.jbang.dev/documentation/guide/latest/installation.html). @@ -70,20 +84,99 @@ A JBang script is provided in the example directory to make running the client e jbang HelloWorldRunner.java ``` -This script automatically handles the dependencies and sources for you. +To enable OpenTelemetry with JBang: +```bash +jbang -Dopentelemetry=true HelloWorldRunner.java +``` ## What the Example Does The Java client (`HelloWorldClient.java`) performs the following actions: 1. Fetches the server's public agent card -2. Fetches the server's extended agent card +2. Fetches the server's extended agent card 3. Creates a client using the extended agent card that connects to the Python server at `http://localhost:9999`. 4. Sends a regular message asking "how much is 10 USD in INR?". 5. Prints the server's response. 6. Sends the same message as a streaming request. 7. Prints each chunk of the server's streaming response as it arrives. +## Enable OpenTelemetry (Optional) + +The client includes support for distributed tracing with OpenTelemetry. To enable it: + +### Prerequisites + +**IMPORTANT**: The client expects an OpenTelemetry collector to be ready and accepting traces. You have two options: + +#### Option 1: Use the Java Server Example (Recommended) + +Instead of the Python server, use the Java server example which has built-in OpenTelemetry support: + +1. **Start the Java server with OpenTelemetry enabled**: + ```bash + cd examples/helloworld/server + mvn quarkus:dev -Popentelemetry + ``` + This will: + - Start the server at `http://localhost:9999` + - Launch Grafana at `http://localhost:3001` + - Start OTLP collectors on ports 5317 (gRPC) and 5318 (HTTP) + +2. **Run the client with OpenTelemetry**: + + Using Maven (from `examples/helloworld/client`): + ```bash + mvn exec:java -Dopentelemetry=true + ``` + + Or using JBang (from `examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/`): + ```bash + jbang -Dopentelemetry=true HelloWorldRunner.java + ``` + +3. **View traces in Grafana**: + - Open `http://localhost:3001` (credentials: admin/admin) + - Go to "Explore" → select "Tempo" data source + - View distributed traces showing the full request flow from client to server + +#### Option 2: Use External OpenTelemetry Collector + +If you want to use the Python server with OpenTelemetry: + +1. **Start an OpenTelemetry collector** on port 5317 (e.g., using Docker): + ```bash + docker run -p 5317:4317 otel/opentelemetry-collector + ``` + +2. **Run the Python server** as described above + +3. **Run the client with OpenTelemetry**: + ```bash + jbang -Dopentelemetry=true HelloWorldRunner.java + ``` + +### What Gets Traced + +When OpenTelemetry is enabled, the client traces: +- Agent card fetching (public and extended) +- Message sending (blocking and streaming) +- Task operations (get, cancel, list) +- Push notification configuration operations +- Connection and transport layer operations + +Client traces are automatically linked with server traces (when using the Java server), providing end-to-end visibility of the entire A2A protocol flow. + +### Configuration + +The client is configured to send traces to `http://localhost:5317` (OTLP gRPC endpoint). To use a different endpoint, modify the `initOpenTelemetry()` method in `HelloWorldClient.java`: + +```java +OtlpGrpcSpanExporter.builder() + .setEndpoint("http://your-collector:4317") + .build() +``` + ## Notes - Make sure the Python server is running before starting the Java client. diff --git a/examples/helloworld/client/pom.xml b/examples/helloworld/client/pom.xml index 8f5b63406..408d3b3a8 100644 --- a/examples/helloworld/client/pom.xml +++ b/examples/helloworld/client/pom.xml @@ -12,7 +12,7 @@ a2a-java-sdk-examples-client - Java SDK A2A Examples + Java SDK A2A Examples - HelloWorld Client Examples for the Java SDK for the Agent2Agent Protocol (A2A) @@ -20,6 +20,22 @@ io.github.a2asdk a2a-java-sdk-client + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-exporter-otlp + + + io.github.a2asdk + a2a-java-sdk-opentelemetry + + + io.opentelemetry + opentelemetry-exporter-logging + @@ -34,6 +50,20 @@ org.apache.maven.plugins maven-surefire-plugin + + org.codehaus.mojo + exec-maven-plugin + 3.6.2 + + io.a2a.examples.helloworld.HelloWorldClient + + + opentelemetry + ${opentelemetry} + + + + \ No newline at end of file diff --git a/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java b/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java index 8192346cd..c344de4c8 100644 --- a/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java +++ b/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldClient.java @@ -21,6 +21,11 @@ import io.a2a.spec.Message; import io.a2a.spec.Part; import io.a2a.spec.TextPart; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; /** * A simple example of using the A2A Java SDK to communicate with an A2A server. @@ -33,6 +38,7 @@ public class HelloWorldClient { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(String[] args) { + OpenTelemetrySdk openTelemetrySdk = null; try { AgentCard finalAgentCard = null; AgentCard publicAgentCard = new A2ACardResolver("http://localhost:9999").getAgentCard(); @@ -82,24 +88,36 @@ public static void main(String[] args) { messageResponse.completeExceptionally(error); }; + JSONRPCTransportConfig transportConfig = new JSONRPCTransportConfig(); + if (Boolean.getBoolean("opentelemetry")) { + openTelemetrySdk = initOpenTelemetry(); + transportConfig.setParameters(Map.of("io.a2a.extras.opentelemetry.Tracer", + openTelemetrySdk.getTracer("helloworld-client"))); + } Client client = Client .builder(finalAgentCard) .addConsumers(consumers) .streamingErrorHandler(streamingErrorHandler) - .withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig()) + .withTransport(JSONRPCTransport.class, transportConfig) .build(); Message message = A2A.toUserMessage(MESSAGE_TEXT); // the message ID will be automatically generated for you - - System.out.println("Sending message: " + MESSAGE_TEXT); - client.sendMessage(message); - System.out.println("Message sent successfully. Responses will be handled by the configured consumers."); - try { + System.out.println("Sending message: " + MESSAGE_TEXT); + client.sendMessage(message); + System.out.println("Message sent successfully. Responses will be handled by the configured consumers."); + String responseText = messageResponse.get(); System.out.println("Response: " + responseText); } catch (Exception e) { System.err.println("Failed to get response: " + e.getMessage()); + } finally { + // Ensure OpenTelemetry SDK is properly shut down to export all pending spans + if (openTelemetrySdk != null) { + System.out.println("Shutting down OpenTelemetry SDK..."); + openTelemetrySdk.close(); + System.out.println("OpenTelemetry SDK shutdown complete."); + } } } catch (Exception e) { System.err.println("An error occurred: " + e.getMessage()); @@ -107,4 +125,21 @@ public static void main(String[] args) { } } -} \ No newline at end of file + static OpenTelemetrySdk initOpenTelemetry() { + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(BatchSpanProcessor.builder( + OtlpGrpcSpanExporter.builder() + .setEndpoint("http://localhost:5317") + .build() + ).build()) + .setResource(Resource.getDefault().toBuilder() + .put("service.version", "1.0") + .put("service.name", "helloworld-client") + .build()) + .build(); + + return OpenTelemetrySdk.builder() + .setTracerProvider(sdkTracerProvider) + .build(); + } +} diff --git a/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldRunner.java b/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldRunner.java index 5a3e46a4f..4a3ac5551 100644 --- a/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldRunner.java +++ b/examples/helloworld/client/src/main/java/io/a2a/examples/helloworld/HelloWorldRunner.java @@ -1,6 +1,10 @@ ///usr/bin/env jbang "$0" "$@" ; exit $? //DEPS io.github.a2asdk:a2a-java-sdk-client:0.4.0.Alpha1-SNAPSHOT //DEPS io.github.a2asdk:a2a-java-sdk-client-transport-jsonrpc:0.4.0.Alpha1-SNAPSHOT +//DEPS io.opentelemetry:opentelemetry-sdk:1.44.1 +//DEPS io.opentelemetry:opentelemetry-exporter-otlp:1.44.1 +//DEPS io.github.a2asdk:a2a-java-sdk-opentelemetry:0.4.0.Alpha1-SNAPSHOT +//DEPS io.opentelemetry:opentelemetry-exporter-logging:1.44.1 //SOURCES HelloWorldClient.java /** diff --git a/examples/helloworld/pom.xml b/examples/helloworld/pom.xml index d5ebb596d..f81acabb6 100644 --- a/examples/helloworld/pom.xml +++ b/examples/helloworld/pom.xml @@ -34,25 +34,6 @@ - - - - io.quarkus - quarkus-maven-plugin - true - - - - build - generate-code - generate-code-tests - - - - - - - client server diff --git a/examples/helloworld/server/README.md b/examples/helloworld/server/README.md index 5573dce09..017db4d69 100644 --- a/examples/helloworld/server/README.md +++ b/examples/helloworld/server/README.md @@ -61,6 +61,36 @@ The Python A2A client (`test_client.py`) performs the following actions: 6. Sends the same message as a streaming request. 7. Prints each chunk of the server's streaming response as it arrives. +## Enable OpenTelemetry (Optional) + +The server includes support for distributed tracing with OpenTelemetry. To enable it: + +1. **Run with the OpenTelemetry profile**: + ```bash + mvn quarkus:dev -Popentelemetry + ``` + +2. **Access Grafana dashboard**: + - Quarkus Dev Services will automatically start a Grafana observability stack + - Open Grafana at `http://localhost:3001` (default credentials: admin/admin) + - View traces in the "Explore" section using the Tempo data source + +3. **What gets traced**: + - All A2A protocol operations (send message, get task, cancel task, etc.) + - Streaming message responses + - Task lifecycle events + - Custom operations in your `AgentExecutor` implementation (using `@Trace` annotation) + +4. **Configuration**: + - OpenTelemetry settings are in `application.properties` + - OTLP exporters run on ports 5317 (gRPC) and 5318 (HTTP) + - To use a custom OTLP endpoint, uncomment and modify: + ```properties + quarkus.otel.exporter.otlp.endpoint=http://localhost:4317 + ``` + +For more information, see the [OpenTelemetry extras module documentation](../../../extras/opentelemetry/README.md). + ## Notes - Make sure the Java server is running before starting the Python client. diff --git a/examples/helloworld/server/pom.xml b/examples/helloworld/server/pom.xml index 5c660336f..e6ed634ab 100644 --- a/examples/helloworld/server/pom.xml +++ b/examples/helloworld/server/pom.xml @@ -12,7 +12,7 @@ a2a-java-sdk-examples-server - Java SDK A2A Examples + Java SDK A2A Examples - HelloWorld Server Examples for the Java SDK for the Agent2Agent Protocol (A2A) @@ -55,7 +55,31 @@ + + --add-opens=java.base/java.lang=ALL-UNNAMED + + + + + opentelemetry + + + io.github.a2asdk + a2a-java-sdk-opentelemetry + + + io.quarkus + quarkus-opentelemetry + + + io.quarkus + quarkus-observability-devservices-lgtm + provided + + + + \ No newline at end of file diff --git a/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java b/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java index 9e34aef0a..e24cfaa81 100644 --- a/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java +++ b/examples/helloworld/server/src/main/java/io/a2a/examples/helloworld/AgentCardProducer.java @@ -7,6 +7,7 @@ import jakarta.enterprise.inject.Produces; import io.a2a.server.PublicAgentCard; +import io.a2a.server.interceptors.Trace; import io.a2a.spec.AgentCapabilities; import io.a2a.spec.AgentCard; import io.a2a.spec.AgentSkill; @@ -14,6 +15,7 @@ @ApplicationScoped public class AgentCardProducer { + @Trace @Produces @PublicAgentCard public AgentCard agentCard() { diff --git a/examples/helloworld/server/src/main/resources/application.properties b/examples/helloworld/server/src/main/resources/application.properties index a2452b339..79aa4eb8c 100644 --- a/examples/helloworld/server/src/main/resources/application.properties +++ b/examples/helloworld/server/src/main/resources/application.properties @@ -1 +1,9 @@ -%dev.quarkus.http.port=9999 \ No newline at end of file +%dev.quarkus.http.port=9999 + +# OpenTelemetry configuration +quarkus.otel.sdk.disabled=false +quarkus.observability.lgtm.grafana-port=3001 +quarkus.observability.lgtm.otel-grpc-port=5317 +quarkus.observability.lgtm.otel-http-port=5318 +#quarkus.otel.exporter.otlp.endpoint=http://localhost:4317 +#quarkus.log.console.format=%d{HH:mm:ss} %-5p traceId=%X{traceId}, parentId=%X{parentId}, spanId=%X{spanId}, sampled=%X{sampled} [%c{2.}] (%t) %s%e%n \ No newline at end of file diff --git a/extras/opentelemetry/pom.xml b/extras/opentelemetry/pom.xml new file mode 100644 index 000000000..ae19564e7 --- /dev/null +++ b/extras/opentelemetry/pom.xml @@ -0,0 +1,52 @@ + + + 4.0.0 + + + io.github.a2asdk + a2a-java-sdk-parent + 0.4.0.Alpha1-SNAPSHOT + ../../pom.xml + + + a2a-java-sdk-opentelemetry + + A2A Java SDK :: Extras :: Opentelemetry + Java SDK for the Agent2Agent Protocol (A2A) - Extras - Opentelemetry + + 2.0.1 + + + + + ${project.groupId} + a2a-java-sdk-server-common + + + jakarta.enterprise + jakarta.enterprise.cdi-api + + + jakarta.inject + jakarta.inject-api + + + org.eclipse.microprofile.telemetry + microprofile-telemetry-api + ${version.org.eclipse.microprofile.telemetry} + pom + provided + + + org.slf4j + slf4j-api + + + ${project.groupId} + a2a-java-sdk-client-transport-spi + + + + \ No newline at end of file diff --git a/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransport.java b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransport.java new file mode 100644 index 000000000..838910624 --- /dev/null +++ b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransport.java @@ -0,0 +1,327 @@ +package io.a2a.extras.opentelemetry; + +import io.a2a.client.transport.spi.ClientTransport; +import io.a2a.client.transport.spi.interceptors.ClientCallContext; +import io.a2a.spec.A2AClientException; +import io.a2a.spec.AgentCard; +import io.a2a.spec.DeleteTaskPushNotificationConfigParams; +import io.a2a.spec.EventKind; +import io.a2a.spec.GetTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTasksParams; +import io.a2a.spec.ListTasksResult; +import io.a2a.spec.MessageSendParams; +import io.a2a.spec.StreamingEventKind; +import io.a2a.spec.Task; +import io.a2a.spec.TaskIdParams; +import io.a2a.spec.TaskPushNotificationConfig; +import io.a2a.spec.TaskQueryParams; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.jspecify.annotations.Nullable; + +public class OpenTelemetryClientTransport implements ClientTransport { + + private final Tracer tracer; + private final ClientTransport delegate; + + public OpenTelemetryClientTransport(ClientTransport delegate, Tracer tracer) { + this.delegate = delegate; + this.tracer = tracer; + } + + @Override + public EventKind sendMessage(MessageSendParams request, @Nullable ClientCallContext context) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder("sendMessage") + .setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute("request", request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + EventKind ret = delegate.sendMessage(request, context); + if (ret != null) { + span.setAttribute("response", ret.toString()); + span.setStatus(StatusCode.OK); + } + return ret; + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + @Override + public void sendMessageStreaming(MessageSendParams request, Consumer eventConsumer, Consumer errorConsumer, @Nullable ClientCallContext context) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder("sendMessageStreaming") + .setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute("request", request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + delegate.sendMessageStreaming(request, new OpenTelemetryEventConsumer("sendMessageStreaming-event", eventConsumer, tracer, span.getSpanContext()), + new OpenTelemetryErrorConsumer("sendMessageStreaming-error", errorConsumer, tracer, span.getSpanContext()), context); + span.setStatus(StatusCode.OK); + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + @Override + public Task getTask(TaskQueryParams request, @Nullable ClientCallContext context) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder("getTask") + .setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute("request", request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + Task ret = delegate.getTask(request, context); + if (ret != null) { + span.setAttribute("response", ret.toString()); + span.setStatus(StatusCode.OK); + } + return ret; + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + @Override + public Task cancelTask(TaskIdParams request, @Nullable ClientCallContext context) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder("cancelTask") + .setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute("request", request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + Task ret = delegate.cancelTask(request, context); + if (ret != null) { + span.setAttribute("response", ret.toString()); + span.setStatus(StatusCode.OK); + } + return ret; + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + @Override + public ListTasksResult listTasks(ListTasksParams request, @Nullable ClientCallContext context) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder("listTasks") + .setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute("request", request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + ListTasksResult ret = delegate.listTasks(request, context); + if (ret != null) { + span.setAttribute("response", ret.toString()); + span.setStatus(StatusCode.OK); + } + return ret; + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + @Override + public TaskPushNotificationConfig setTaskPushNotificationConfiguration(TaskPushNotificationConfig request, @Nullable ClientCallContext context) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder("setTaskPushNotificationConfiguration") + .setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute("request", request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + TaskPushNotificationConfig ret = delegate.setTaskPushNotificationConfiguration(request, context); + if (ret != null) { + span.setAttribute("response", ret.toString()); + span.setStatus(StatusCode.OK); + } + return ret; + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + @Override + public TaskPushNotificationConfig getTaskPushNotificationConfiguration(GetTaskPushNotificationConfigParams request, @Nullable ClientCallContext context) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder("getTaskPushNotificationConfiguration") + .setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute("request", request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + TaskPushNotificationConfig ret = delegate.getTaskPushNotificationConfiguration(request, context); + if (ret != null) { + span.setAttribute("response", ret.toString()); + span.setStatus(StatusCode.OK); + } + return ret; + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + @Override + public List listTaskPushNotificationConfigurations(ListTaskPushNotificationConfigParams request, @Nullable ClientCallContext context) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder("listTaskPushNotificationConfigurations") + .setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute("request", request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + List ret = delegate.listTaskPushNotificationConfigurations(request, context); + if (ret != null) { + span.setAttribute("response", ret.stream().map(TaskPushNotificationConfig::toString).collect(Collectors.joining(","))); + span.setStatus(StatusCode.OK); + } + return ret; + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + @Override + public void deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationConfigParams request, @Nullable ClientCallContext context) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder("deleteTaskPushNotificationConfigurations") + .setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute("request", request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + delegate.deleteTaskPushNotificationConfigurations(request, context); + span.setStatus(StatusCode.OK); + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + @Override + public void resubscribe(TaskIdParams request, Consumer eventConsumer, Consumer errorConsumer, @Nullable ClientCallContext context) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder("resubscribe") + .setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute("request", request.toString()); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + delegate.resubscribe(request, new OpenTelemetryEventConsumer("resubscribe-event", eventConsumer, tracer, span.getSpanContext()), + new OpenTelemetryErrorConsumer("resubscribe-error", errorConsumer, tracer, span.getSpanContext()), context); + span.setStatus(StatusCode.OK); + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + @Override + public AgentCard getAgentCard(@Nullable ClientCallContext context) throws A2AClientException { + SpanBuilder spanBuilder = tracer.spanBuilder("getAgentCard") + .setSpanKind(SpanKind.CLIENT); + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + AgentCard ret = delegate.getAgentCard(context); + if (ret != null) { + span.setAttribute("response", ret.toString()); + span.setStatus(StatusCode.OK); + } + return ret; + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + @Override + public void close() { + delegate.close(); + } + + private static class OpenTelemetryEventConsumer implements Consumer { + + private final Consumer delegate; + private final Tracer tracer; + private final SpanContext context; + private final String name; + + public OpenTelemetryEventConsumer(String name, Consumer delegate, Tracer tracer, SpanContext context) { + this.delegate = delegate; + this.tracer = tracer; + this.context = context; + this.name = name; + } + + @Override + public void accept(StreamingEventKind t) { + SpanBuilder spanBuilder = tracer.spanBuilder(name) + .setSpanKind(SpanKind.CLIENT); + spanBuilder.setAttribute("gen_ai.agent.a2a.streaming-event", t.toString()); + spanBuilder.addLink(context); + Span span = spanBuilder.startSpan(); + try { + delegate.accept(t); + span.setStatus(StatusCode.OK); + } finally { + span.end(); + } + } + } + + private static class OpenTelemetryErrorConsumer implements Consumer { + + private final Consumer delegate; + private final Tracer tracer; + private final SpanContext context; + private final String name; + + public OpenTelemetryErrorConsumer(String name, Consumer delegate, Tracer tracer, SpanContext context) { + this.delegate = delegate; + this.tracer = tracer; + this.context = context; + this.name = name; + } + + @Override + public void accept(Throwable t) { + if (t == null) { + return; + } + SpanBuilder spanBuilder = tracer.spanBuilder(name) + .setSpanKind(SpanKind.CLIENT); + spanBuilder.addLink(context); + Span span = spanBuilder.startSpan(); + try { + span.setStatus(StatusCode.ERROR, t.getMessage()); + delegate.accept(t); + } finally { + span.end(); + } + } + } +} diff --git a/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransportWrapper.java b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransportWrapper.java new file mode 100644 index 000000000..be5294299 --- /dev/null +++ b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/OpenTelemetryClientTransportWrapper.java @@ -0,0 +1,45 @@ +package io.a2a.extras.opentelemetry; + +import io.a2a.client.transport.spi.ClientTransport; +import io.a2a.client.transport.spi.ClientTransportConfig; +import io.a2a.client.transport.spi.ClientTransportWrapper; +import io.opentelemetry.api.trace.Tracer; + +/** + * OpenTelemetry client transport wrapper that adds distributed tracing to A2A client calls. + * + *

This wrapper is automatically discovered via Java's ServiceLoader mechanism. + * To enable tracing, add a {@link Tracer} instance to the transport configuration: + *

{@code
+ * ClientTransportConfig config = new JSONRPCTransportConfig();
+ * config.setParameters(Map.of(
+ *     OpenTelemetryClientTransportFactory.OTEL_TRACER_KEY,
+ *     openTelemetry.getTracer("my-service")
+ * ));
+ * }
+ */ +public class OpenTelemetryClientTransportWrapper implements ClientTransportWrapper { + + /** + * Configuration key for the OpenTelemetry Tracer instance. + * Value must be of type {@link Tracer}. + */ + public static final String OTEL_TRACER_KEY = "io.a2a.extras.opentelemetry.Tracer"; + + @Override + public ClientTransport wrap(ClientTransport transport, ClientTransportConfig config) { + Object tracerObj = config.getParameters().get(OTEL_TRACER_KEY); + if (tracerObj instanceof Tracer tracer) { + return new OpenTelemetryClientTransport(transport, tracer); + } + // No tracer configured, return unwrapped transport + return transport; + } + + @Override + public int priority() { + // Observability/tracing should be in the middle priority range + // so it can observe other wrappers but doesn't interfere with security + return 500; + } +} diff --git a/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/SpanInterceptor.java b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/SpanInterceptor.java new file mode 100644 index 000000000..bd158f301 --- /dev/null +++ b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/SpanInterceptor.java @@ -0,0 +1,93 @@ +package io.a2a.extras.opentelemetry; + +import io.a2a.server.interceptors.Kind; +import io.a2a.server.interceptors.NoAttributeExtractor; +import io.a2a.server.interceptors.Trace; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import jakarta.annotation.Priority; +import jakarta.inject.Inject; +import jakarta.interceptor.AroundInvoke; +import jakarta.interceptor.Interceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Jakarta EE CDI interceptor for @Trace annotation. + * Integrates with OpenTelemetry to create spans for traced methods. + */ +@Trace() +@Interceptor +@Priority(Interceptor.Priority.APPLICATION) +public class SpanInterceptor { + + private static final Logger LOGGER = LoggerFactory.getLogger(SpanInterceptor.class); + + @Inject + private Tracer tracer; + + @AroundInvoke + public Object trace(jakarta.interceptor.InvocationContext jakartaContext) throws Exception { + // Convert Jakarta InvocationContext to our custom InvocationContext + io.a2a.server.interceptors.InvocationContext customContext + = new io.a2a.server.interceptors.InvocationContext( + jakartaContext.getTarget(), + jakartaContext.getMethod(), + jakartaContext.getParameters() + ); + + Kind kind = jakartaContext + .getMethod() + .getAnnotation(Trace.class) + .kind(); + Class>>> extractorClass + = jakartaContext.getMethod() + .getAnnotation(Trace.class) + .extractor(); + + String name = jakartaContext.getTarget().getClass().getName(); + if (name != null && name.endsWith("_Subclass")) { + name = name.substring(0, name.length() - "_Subclass".length()); + } + name = name + '#' + jakartaContext.getMethod().getName(); + SpanBuilder spanBuilder = tracer.spanBuilder(name) + .setSpanKind(SpanKind.valueOf(kind.toString())); + + if (extractorClass != null && !extractorClass.equals(NoAttributeExtractor.class)) { + try { + Supplier>> supplier + = extractorClass.getDeclaredConstructor().newInstance(); + Map attributes = supplier.get().apply(customContext); + for (Map.Entry attribute : attributes.entrySet()) { + spanBuilder.setAttribute(attribute.getKey(), attribute.getValue()); + } + } catch (Exception e) { + LOGGER.warn("Failed to instantiate attribute extractor {}: {}", + extractorClass.getName(), e.getMessage(), e); + } + } + + Span span = spanBuilder.startSpan(); + try (Scope scope = span.makeCurrent()) { + Object ret = jakartaContext.proceed(); + span.setStatus(StatusCode.OK); + if (ret != null) { + span.setAttribute("gen_ai.agent.a2a.response", ret.toString()); + } + return ret; + } catch (Exception ex) { + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } +} diff --git a/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/package-info.java b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/package-info.java new file mode 100644 index 000000000..fdd7057d5 --- /dev/null +++ b/extras/opentelemetry/src/main/java/io/a2a/extras/opentelemetry/package-info.java @@ -0,0 +1,5 @@ +@NullMarked +package io.a2a.extras.opentelemetry; + +import org.jspecify.annotations.NullMarked; + diff --git a/extras/opentelemetry/src/main/resources/META-INF/beans.xml b/extras/opentelemetry/src/main/resources/META-INF/beans.xml new file mode 100644 index 000000000..9b2940fc2 --- /dev/null +++ b/extras/opentelemetry/src/main/resources/META-INF/beans.xml @@ -0,0 +1,6 @@ + + + \ No newline at end of file diff --git a/extras/opentelemetry/src/main/resources/META-INF/services/io.a2a.client.transport.spi.ClientTransportWrapper b/extras/opentelemetry/src/main/resources/META-INF/services/io.a2a.client.transport.spi.ClientTransportWrapper new file mode 100644 index 000000000..f5312bf6b --- /dev/null +++ b/extras/opentelemetry/src/main/resources/META-INF/services/io.a2a.client.transport.spi.ClientTransportWrapper @@ -0,0 +1 @@ +io.a2a.extras.opentelemetry.OpenTelemetryClientTransportWrapper diff --git a/pom.xml b/pom.xml index 5442bd2ac..5130caf34 100644 --- a/pom.xml +++ b/pom.xml @@ -172,6 +172,16 @@ a2a-java-sdk-reference-rest ${project.version}
+ + io.github.a2asdk + a2a-java-sdk-opentelemetry + ${project.version} + + + io.github.a2asdk + a2a-java-sdk-opentelemetry-spring + ${project.version} + io.grpc grpc-bom @@ -452,6 +462,7 @@ examples/helloworld examples/cloud-deployment/server extras/common + extras/opentelemetry extras/task-store-database-jpa extras/push-notification-config-store-database-jpa extras/queue-manager-replicated diff --git a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java index 9f0559cdb..1c5e804fe 100644 --- a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java +++ b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java @@ -13,7 +13,7 @@ /** * gRPC server interceptor that captures request metadata and context information, * providing equivalent functionality to Python's grpc.aio.ServicerContext. - * + * * This interceptor: * - Extracts A2A extension headers from incoming requests * - Captures ServerCall and Metadata for rich context access @@ -23,7 +23,6 @@ @ApplicationScoped public class A2AExtensionsInterceptor implements ServerInterceptor { - @Override public ServerCall.Listener interceptCall( ServerCall serverCall, @@ -31,18 +30,20 @@ public ServerCall.Listener interceptCall( ServerCallHandler serverCallHandler) { // Extract A2A extensions header - Metadata.Key extensionsKey = - Metadata.Key.of(A2AHeaders.X_A2A_EXTENSIONS, Metadata.ASCII_STRING_MARSHALLER); + Metadata.Key extensionsKey + = Metadata.Key.of(A2AHeaders.X_A2A_EXTENSIONS, Metadata.ASCII_STRING_MARSHALLER); String extensions = metadata.get(extensionsKey); // Create enhanced context with rich information (equivalent to Python's ServicerContext) Context context = Context.current() - // Store complete metadata for full header access - .withValue(GrpcContextKeys.METADATA_KEY, metadata) - // Store method name (equivalent to Python's context.method()) - .withValue(GrpcContextKeys.METHOD_NAME_KEY, serverCall.getMethodDescriptor().getFullMethodName()) - // Store peer information for client connection details - .withValue(GrpcContextKeys.PEER_INFO_KEY, getPeerInfo(serverCall)); + // Store complete metadata for full header access + .withValue(GrpcContextKeys.METADATA_KEY, metadata) + // Store Grpc method name + .withValue(GrpcContextKeys.GRPC_METHOD_NAME_KEY, serverCall.getMethodDescriptor().getFullMethodName()) + // Store method name (equivalent to Python's context.method()) + .withValue(GrpcContextKeys.METHOD_NAME_KEY, GrpcContextKeys.METHOD_MAPPING.get(serverCall.getMethodDescriptor().getBareMethodName())) + // Store peer information for client connection details + .withValue(GrpcContextKeys.PEER_INFO_KEY, getPeerInfo(serverCall)); // Store A2A extensions if present if (extensions != null) { @@ -55,7 +56,7 @@ public ServerCall.Listener interceptCall( /** * Safely extracts peer information from the ServerCall. - * + * * @param serverCall the gRPC ServerCall * @return peer information string, or "unknown" if not available */ diff --git a/server-common/src/main/java/io/a2a/server/interceptors/InvocationContext.java b/server-common/src/main/java/io/a2a/server/interceptors/InvocationContext.java new file mode 100644 index 000000000..d52bd6619 --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/interceptors/InvocationContext.java @@ -0,0 +1,39 @@ +package io.a2a.server.interceptors; + +import java.lang.reflect.Method; + +public record InvocationContext(Object target, Method method, Object[] parameters) { + + public static InvocationContext create(Object target, Object[] parameters) { + StackWalker walker = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE); + + Method callingMethod = walker.walk(frames + -> frames.skip(1) // Skip the create method itself + .findFirst() + .map(frame -> { + try { + Class declaringClass = frame.getDeclaringClass(); + String methodName = frame.getMethodName(); + // Find the method by name and parameter count + for (Method m : declaringClass.getDeclaredMethods()) { + if (m.getName().equals(methodName) && m.getParameterCount() == parameters.length) { + return m; + } + } + return null; + } catch (Exception e) { + return null; + } + }) + .orElse(null) + ); + return new InvocationContext(target, callingMethod, parameters); + } + + public Object proceed() throws Exception { + if (method != null) { + return method.invoke(target, parameters); + } + return null; + } +} diff --git a/server-common/src/main/java/io/a2a/server/interceptors/Kind.java b/server-common/src/main/java/io/a2a/server/interceptors/Kind.java new file mode 100644 index 000000000..51e093908 --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/interceptors/Kind.java @@ -0,0 +1,5 @@ +package io.a2a.server.interceptors; + +public enum Kind { + INTERNAL, SERVER, CLIENT, PRODUCER, CONSUMER; +} diff --git a/server-common/src/main/java/io/a2a/server/interceptors/NoAttributeExtractor.java b/server-common/src/main/java/io/a2a/server/interceptors/NoAttributeExtractor.java new file mode 100644 index 000000000..8e027b46b --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/interceptors/NoAttributeExtractor.java @@ -0,0 +1,14 @@ +package io.a2a.server.interceptors; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +public class NoAttributeExtractor implements Supplier>> { + + @Override + public Function> get() { + return ctx -> Collections.emptyMap(); + } +} diff --git a/server-common/src/main/java/io/a2a/server/interceptors/Trace.java b/server-common/src/main/java/io/a2a/server/interceptors/Trace.java new file mode 100644 index 000000000..7cdbad288 --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/interceptors/Trace.java @@ -0,0 +1,27 @@ +package io.a2a.server.interceptors; + +import jakarta.enterprise.util.Nonbinding; +import jakarta.interceptor.InterceptorBinding; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Framework-agnostic annotation for method tracing. + * Works with both Jakarta EE CDI interceptors and Spring AOP. + */ +@InterceptorBinding +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +@Inherited +public @interface Trace { + @Nonbinding + Kind kind() default Kind.SERVER; + @Nonbinding + Class>>> extractor() default NoAttributeExtractor.class; +} diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index b59a9aedb..ba6665509 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -1,5 +1,6 @@ package io.a2a.server.requesthandlers; +import static io.a2a.server.interceptors.Kind.SERVER; import static io.a2a.server.util.async.AsyncUtils.convertingProcessor; import static io.a2a.server.util.async.AsyncUtils.createTubeConfig; import static io.a2a.server.util.async.AsyncUtils.processor; @@ -35,6 +36,7 @@ import io.a2a.server.events.EventQueueItem; import io.a2a.server.events.QueueManager; import io.a2a.server.events.TaskQueueExistsException; +import io.a2a.server.interceptors.Trace; import io.a2a.server.tasks.PushNotificationConfigStore; import io.a2a.server.tasks.PushNotificationSender; import io.a2a.server.tasks.ResultAggregator; @@ -152,6 +154,7 @@ public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStor } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public Task onGetTask(TaskQueryParams params, ServerCallContext context) throws JSONRPCError { LOGGER.debug("onGetTask {}", params.id()); Task task = taskStore.get(params.id()); @@ -185,6 +188,7 @@ private static Task limitTaskHistory(Task task, int historyLength) { } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext context) throws JSONRPCError { LOGGER.debug("onListTasks with contextId={}, status={}, pageSize={}, pageToken={}, lastUpdatedAfter={}", params.contextId(), params.status(), params.pageSize(), params.pageToken(), params.lastUpdatedAfter()); @@ -207,6 +211,7 @@ public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext con } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws JSONRPCError { Task task = taskStore.get(params.id()); if (task == null) { @@ -259,6 +264,7 @@ public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public EventKind onMessageSend(MessageSendParams params, ServerCallContext context) throws JSONRPCError { LOGGER.debug("onMessageSend - task: {}; context {}", params.message().getTaskId(), params.message().getContextId()); MessageSendSetup mss = initMessageSend(params, context); @@ -392,6 +398,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public Flow.Publisher onMessageSendStream( MessageSendParams params, ServerCallContext context) throws JSONRPCError { LOGGER.debug("onMessageSendStream START - task: {}; context: {}; runningAgents: {}; backgroundTasks: {}", @@ -546,6 +553,7 @@ private void startBackgroundConsumption() { } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public TaskPushNotificationConfig onSetTaskPushNotificationConfig( TaskPushNotificationConfig params, ServerCallContext context) throws JSONRPCError { if (pushConfigStore == null) { @@ -561,6 +569,7 @@ public TaskPushNotificationConfig onSetTaskPushNotificationConfig( } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public TaskPushNotificationConfig onGetTaskPushNotificationConfig( GetTaskPushNotificationConfigParams params, ServerCallContext context) throws JSONRPCError { if (pushConfigStore == null) { @@ -592,6 +601,7 @@ private PushNotificationConfig getPushNotificationConfig(List onResubscribeToTask( TaskIdParams params, ServerCallContext context) throws JSONRPCError { LOGGER.debug("onResubscribeToTask - taskId: {}", params.id()); @@ -623,6 +633,7 @@ public Flow.Publisher onResubscribeToTask( } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public List onListTaskPushNotificationConfig( ListTaskPushNotificationConfigParams params, ServerCallContext context) throws JSONRPCError { if (pushConfigStore == null) { @@ -646,6 +657,7 @@ public List onListTaskPushNotificationConfig( } @Override + @Trace(extractor = RequestHandlerAttributeExtractor.class, kind = SERVER) public void onDeleteTaskPushNotificationConfig( DeleteTaskPushNotificationConfigParams params, ServerCallContext context) { if (pushConfigStore == null) { diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandlerAttributeExtractor.java b/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandlerAttributeExtractor.java new file mode 100644 index 000000000..2b06277fc --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/RequestHandlerAttributeExtractor.java @@ -0,0 +1,39 @@ +package io.a2a.server.requesthandlers; + +import io.a2a.server.ServerCallContext; +import io.a2a.server.interceptors.InvocationContext; +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class RequestHandlerAttributeExtractor implements Supplier>> { + + @Override + public Function> get() { + return ctx -> { + String method = ctx.method().getName(); + Object[] parameters = ctx.parameters(); + + switch (method) { + case "onMessageSend", + "onMessageSendStream", + "onCancelTask", + "onResubscribeToTask", + "getPushNotificationConfig", + "setPushNotificationConfig", + "onGetTask", + "listPushNotificationConfig", + "deletePushNotificationConfig", + "onListTasks" -> { + ServerCallContext context = (ServerCallContext) parameters[1]; + return Map.of("request", parameters[0].toString(), "extensions", context.getActivatedExtensions().stream().collect(Collectors.joining(",")), "a2a.method", (String) context.getState().get("method")); + } + default -> { + return Collections.emptyMap(); + } + } + }; + } +} diff --git a/spec/src/main/java/io/a2a/spec/Message.java b/spec/src/main/java/io/a2a/spec/Message.java index 19843d26c..1bfd59680 100644 --- a/spec/src/main/java/io/a2a/spec/Message.java +++ b/spec/src/main/java/io/a2a/spec/Message.java @@ -129,6 +129,11 @@ public String getKind() { return kind; } + @Override + public String toString() { + return "Message{" + "role=" + role + ", parts=" + parts + ", messageId=" + messageId + ", contextId=" + contextId + ", taskId=" + taskId + ", metadata=" + metadata + ", kind=" + kind + ", referenceTaskIds=" + referenceTaskIds + ", extensions=" + extensions + '}'; + } + /** * Defines the role of the message sender in the conversation. *

diff --git a/spec/src/main/java/io/a2a/spec/Task.java b/spec/src/main/java/io/a2a/spec/Task.java index c2fd23d10..6e44e8b80 100644 --- a/spec/src/main/java/io/a2a/spec/Task.java +++ b/spec/src/main/java/io/a2a/spec/Task.java @@ -113,6 +113,11 @@ public String getKind() { return kind; } + @Override + public String toString() { + return "Task{" + "id=" + id + ", contextId=" + contextId + ", status=" + status + ", artifacts=" + artifacts + ", history=" + history + ", metadata=" + metadata + ", kind=" + kind + '}'; + } + /** * Builder for constructing immutable {@link Task} instances. *

diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java index 483daf7e8..3e432098f 100644 --- a/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java @@ -1,43 +1,73 @@ package io.a2a.transport.grpc.context; +import io.a2a.spec.GetTaskPushNotificationConfigRequest; +import io.a2a.spec.CancelTaskRequest; +import io.a2a.spec.DeleteTaskPushNotificationConfigRequest; +import io.a2a.spec.GetTaskRequest; +import io.a2a.spec.ListTaskPushNotificationConfigRequest; +import io.a2a.spec.ListTasksRequest; +import io.a2a.spec.SendMessageRequest; +import io.a2a.spec.SendStreamingMessageRequest; +import io.a2a.spec.SetTaskPushNotificationConfigRequest; +import io.a2a.spec.TaskResubscriptionRequest; import io.grpc.Context; +import java.util.Map; /** * Shared gRPC context keys for A2A protocol data. - * + * * These keys provide access to gRPC context information similar to * Python's grpc.aio.ServicerContext, enabling rich context access * in service method implementations. */ public final class GrpcContextKeys { - + /** * Context key for storing the X-A2A-Extensions header value. * Set by server interceptors and accessed by service handlers. */ - public static final Context.Key EXTENSIONS_HEADER_KEY = - Context.key("x-a2a-extensions"); - + public static final Context.Key EXTENSIONS_HEADER_KEY + = Context.key("x-a2a-extensions"); + /** * Context key for storing the complete gRPC Metadata object. * Provides access to all request headers and metadata. */ - public static final Context.Key METADATA_KEY = - Context.key("grpc-metadata"); - + public static final Context.Key METADATA_KEY + = Context.key("grpc-metadata"); + + /** + * Context key for storing the method name being called. + * Equivalent to Python's context.method() functionality. + */ + public static final Context.Key GRPC_METHOD_NAME_KEY + = Context.key("grpc-method-name"); + /** * Context key for storing the method name being called. * Equivalent to Python's context.method() functionality. */ - public static final Context.Key METHOD_NAME_KEY = - Context.key("grpc-method-name"); - + public static final Context.Key METHOD_NAME_KEY + = Context.key("method"); + /** * Context key for storing the peer information. * Provides access to client connection details. */ - public static final Context.Key PEER_INFO_KEY = - Context.key("grpc-peer-info"); + public static final Context.Key PEER_INFO_KEY + = Context.key("grpc-peer-info"); + + public static final Map METHOD_MAPPING = Map.of( + "SendMessage", SendMessageRequest.METHOD, + "SendStreamingMessage", SendStreamingMessageRequest.METHOD, + "GetTask", GetTaskRequest.METHOD, + "ListTask", ListTasksRequest.METHOD, + "CancelTask", CancelTaskRequest.METHOD, + "TaskSubscription", TaskResubscriptionRequest.METHOD, + "CreateTaskPushNotification", SetTaskPushNotificationConfigRequest.METHOD, + "GetTaskPushNotification", GetTaskPushNotificationConfigRequest.METHOD, + "ListTaskPushNotification", ListTaskPushNotificationConfigRequest.METHOD, + "DeleteTaskPushNotification", DeleteTaskPushNotificationConfigRequest.METHOD); private GrpcContextKeys() { // Utility class diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractor.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractor.java new file mode 100644 index 000000000..18c855575 --- /dev/null +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcAttributeExtractor.java @@ -0,0 +1,39 @@ +package io.a2a.transport.grpc.handler; + +import io.a2a.server.interceptors.InvocationContext; +import io.grpc.Context; +import io.a2a.transport.grpc.context.GrpcContextKeys; +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +public class GrpcAttributeExtractor implements Supplier>> { + + @Override + public Function> get() { + return ctx -> { + String method = ctx.method().getName(); + Object[] parameters = ctx.parameters(); + + switch (method) { + case "sendMessage", + "getTask", + "listTasks", + "cancelTask", + "createTaskPushNotificationConfig", + "getTaskPushNotificationConfig", + "listTaskPushNotificationConfig", + "sendStreamingMessage", + "taskSubscription", + "deleteTaskPushNotificationConfig" -> { + Context currentContext = Context.current(); + return Map.of("gen_ai.agent.a2a.request", parameters[0].toString(), "extensions", GrpcContextKeys.EXTENSIONS_HEADER_KEY.get(), "gen_ai.agent.operation.name", GrpcContextKeys.GRPC_METHOD_NAME_KEY.get(currentContext)); + } + default -> { + return Collections.emptyMap(); + } + } + }; + } +} diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java index c5174c4ad..8541a1316 100644 --- a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java @@ -2,6 +2,7 @@ import static io.a2a.grpc.utils.ProtoUtils.FromProto; import static io.a2a.grpc.utils.ProtoUtils.ToProto; +import static io.a2a.server.interceptors.Kind.SERVER; import jakarta.enterprise.inject.Vetoed; @@ -27,6 +28,7 @@ import io.a2a.server.auth.UnauthenticatedUser; import io.a2a.server.auth.User; import io.a2a.server.extensions.A2AExtensions; +import io.a2a.server.interceptors.Trace; import io.a2a.transport.grpc.context.GrpcContextKeys; import io.a2a.server.requesthandlers.RequestHandler; import io.a2a.spec.AgentCard; @@ -71,6 +73,7 @@ public GrpcHandler() { } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void sendMessage(io.a2a.grpc.SendMessageRequest request, StreamObserver responseObserver) { try { @@ -90,6 +93,7 @@ public void sendMessage(io.a2a.grpc.SendMessageRequest request, } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void getTask(io.a2a.grpc.GetTaskRequest request, StreamObserver responseObserver) { try { @@ -112,6 +116,7 @@ public void getTask(io.a2a.grpc.GetTaskRequest request, } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void listTasks(io.a2a.grpc.ListTasksRequest request, StreamObserver responseObserver) { try { @@ -130,6 +135,7 @@ public void listTasks(io.a2a.grpc.ListTasksRequest request, } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void cancelTask(io.a2a.grpc.CancelTaskRequest request, StreamObserver responseObserver) { try { @@ -152,6 +158,7 @@ public void cancelTask(io.a2a.grpc.CancelTaskRequest request, } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void createTaskPushNotificationConfig(io.a2a.grpc.CreateTaskPushNotificationConfigRequest request, StreamObserver responseObserver) { if (!getAgentCardInternal().capabilities().pushNotifications()) { @@ -175,6 +182,7 @@ public void createTaskPushNotificationConfig(io.a2a.grpc.CreateTaskPushNotificat } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void getTaskPushNotificationConfig(io.a2a.grpc.GetTaskPushNotificationConfigRequest request, StreamObserver responseObserver) { if (!getAgentCardInternal().capabilities().pushNotifications()) { @@ -198,6 +206,7 @@ public void getTaskPushNotificationConfig(io.a2a.grpc.GetTaskPushNotificationCon } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void listTaskPushNotificationConfig(io.a2a.grpc.ListTaskPushNotificationConfigRequest request, StreamObserver responseObserver) { if (!getAgentCardInternal().capabilities().pushNotifications()) { @@ -226,6 +235,7 @@ public void listTaskPushNotificationConfig(io.a2a.grpc.ListTaskPushNotificationC } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void sendStreamingMessage(io.a2a.grpc.SendMessageRequest request, StreamObserver responseObserver) { if (!getAgentCardInternal().capabilities().streaming()) { @@ -248,6 +258,7 @@ public void sendStreamingMessage(io.a2a.grpc.SendMessageRequest request, } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void taskSubscription(io.a2a.grpc.TaskSubscriptionRequest request, StreamObserver responseObserver) { if (!getAgentCardInternal().capabilities().streaming()) { @@ -328,6 +339,7 @@ public void getAgentCard(io.a2a.grpc.GetAgentCardRequest request, } @Override + @Trace(extractor = GrpcAttributeExtractor.class, kind = SERVER) public void deleteTaskPushNotificationConfig(io.a2a.grpc.DeleteTaskPushNotificationConfigRequest request, StreamObserver responseObserver) { if (!getAgentCardInternal().capabilities().pushNotifications()) { @@ -378,7 +390,7 @@ private ServerCallContext createCallContext(StreamObserver responseObserv state.put("grpc_metadata", grpcMetadata); } - String methodName = GrpcContextKeys.METHOD_NAME_KEY.get(currentContext); + String methodName = GrpcContextKeys.GRPC_METHOD_NAME_KEY.get(currentContext); if (methodName != null) { state.put("grpc_method_name", methodName); } @@ -578,7 +590,7 @@ protected static io.grpc.Metadata getCurrentMetadata() { * @return the method name, or null if not available */ protected static String getCurrentMethodName() { - return getFromContext(GrpcContextKeys.METHOD_NAME_KEY); + return getFromContext(GrpcContextKeys.GRPC_METHOD_NAME_KEY); } /** diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java index 058ebc5f9..d7a240f0b 100644 --- a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandler.java @@ -1,6 +1,8 @@ package io.a2a.transport.jsonrpc.handler; +import static io.a2a.server.interceptors.Kind.SERVER; import static io.a2a.server.util.async.AsyncUtils.createTubeConfig; + import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; @@ -14,6 +16,7 @@ import io.a2a.server.ExtendedAgentCard; import io.a2a.server.PublicAgentCard; import io.a2a.server.ServerCallContext; +import io.a2a.server.interceptors.Trace; import io.a2a.server.requesthandlers.RequestHandler; import io.a2a.spec.AgentCard; import io.a2a.spec.AuthenticatedExtendedCardNotConfiguredError; @@ -76,6 +79,7 @@ public JSONRPCHandler(@PublicAgentCard AgentCard agentCard, RequestHandler reque this(agentCard, null, requestHandler, executor); } + @Trace(extractor=JsonRPCAttributeExtractor.class, kind = SERVER) public SendMessageResponse onMessageSend(SendMessageRequest request, ServerCallContext context) { try { EventKind taskOrMessage = requestHandler.onMessageSend(request.getParams(), context); @@ -87,7 +91,7 @@ public SendMessageResponse onMessageSend(SendMessageRequest request, ServerCallC } } - + @Trace(extractor=JsonRPCAttributeExtractor.class, kind = SERVER) public Flow.Publisher onMessageSendStream( SendStreamingMessageRequest request, ServerCallContext context) { if (!agentCard.capabilities().streaming()) { @@ -110,6 +114,7 @@ public Flow.Publisher onMessageSendStream( } } + @Trace(extractor=JsonRPCAttributeExtractor.class, kind = SERVER) public CancelTaskResponse onCancelTask(CancelTaskRequest request, ServerCallContext context) { try { Task task = requestHandler.onCancelTask(request.getParams(), context); @@ -124,6 +129,7 @@ public CancelTaskResponse onCancelTask(CancelTaskRequest request, ServerCallCont } } + @Trace(extractor=JsonRPCAttributeExtractor.class, kind = SERVER) public Flow.Publisher onResubscribeToTask( TaskResubscriptionRequest request, ServerCallContext context) { if (!agentCard.capabilities().streaming()) { @@ -132,7 +138,6 @@ public Flow.Publisher onResubscribeToTask( request.getId(), new InvalidRequestError("Streaming is not supported by the agent"))); } - try { Flow.Publisher publisher = requestHandler.onResubscribeToTask(request.getParams(), context); @@ -146,6 +151,7 @@ public Flow.Publisher onResubscribeToTask( } } + @Trace(extractor=JsonRPCAttributeExtractor.class, kind = SERVER) public GetTaskPushNotificationConfigResponse getPushNotificationConfig( GetTaskPushNotificationConfigRequest request, ServerCallContext context) { if (!agentCard.capabilities().pushNotifications()) { @@ -163,6 +169,7 @@ public GetTaskPushNotificationConfigResponse getPushNotificationConfig( } } + @Trace(extractor=JsonRPCAttributeExtractor.class, kind = SERVER) public SetTaskPushNotificationConfigResponse setPushNotificationConfig( SetTaskPushNotificationConfigRequest request, ServerCallContext context) { if (!agentCard.capabilities().pushNotifications()) { @@ -180,6 +187,7 @@ public SetTaskPushNotificationConfigResponse setPushNotificationConfig( } } + @Trace(extractor=JsonRPCAttributeExtractor.class, kind = SERVER) public GetTaskResponse onGetTask(GetTaskRequest request, ServerCallContext context) { try { Task task = requestHandler.onGetTask(request.getParams(), context); @@ -191,6 +199,7 @@ public GetTaskResponse onGetTask(GetTaskRequest request, ServerCallContext conte } } + @Trace(extractor=JsonRPCAttributeExtractor.class, kind = SERVER) public ListTasksResponse onListTasks(ListTasksRequest request, ServerCallContext context) { try { ListTasksResult result = requestHandler.onListTasks(request.getParams(), context); @@ -202,6 +211,7 @@ public ListTasksResponse onListTasks(ListTasksRequest request, ServerCallContext } } + @Trace(extractor=JsonRPCAttributeExtractor.class, kind = SERVER) public ListTaskPushNotificationConfigResponse listPushNotificationConfig( ListTaskPushNotificationConfigRequest request, ServerCallContext context) { if ( !agentCard.capabilities().pushNotifications()) { @@ -219,6 +229,7 @@ public ListTaskPushNotificationConfigResponse listPushNotificationConfig( } } + @Trace(extractor=JsonRPCAttributeExtractor.class, kind = SERVER) public DeleteTaskPushNotificationConfigResponse deletePushNotificationConfig( DeleteTaskPushNotificationConfigRequest request, ServerCallContext context) { if ( !agentCard.capabilities().pushNotifications()) { diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JsonRPCAttributeExtractor.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JsonRPCAttributeExtractor.java new file mode 100644 index 000000000..aaec66ea4 --- /dev/null +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/handler/JsonRPCAttributeExtractor.java @@ -0,0 +1,41 @@ +package io.a2a.transport.jsonrpc.handler; + +import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.METHOD_NAME_KEY; + +import io.a2a.server.ServerCallContext; +import io.a2a.server.interceptors.InvocationContext; +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class JsonRPCAttributeExtractor implements Supplier>> { + + @Override + public Function> get() { + return ctx -> { + String method = ctx.method().getName(); + Object[] parameters = ctx.parameters(); + + switch (method) { + case "onMessageSend", + "onMessageSendStream", + "onCancelTask", + "onResubscribeToTask", + "getPushNotificationConfig", + "setPushNotificationConfig", + "onGetTask", + "listPushNotificationConfig", + "deletePushNotificationConfig", + "onListTasks" -> { + ServerCallContext context = (ServerCallContext) parameters[1]; + return Map.of("gen_ai.agent.a2a.request", parameters[0].toString(), "gen_ai.agent.a2a.extensions", context.getActivatedExtensions().stream().collect(Collectors.joining(",")), "gen_ai.agent.operation.name", (String) context.getState().get(METHOD_NAME_KEY)); + } + default -> { + return Collections.emptyMap(); + } + } + }; + } +} diff --git a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestAttributeExtractor.java b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestAttributeExtractor.java new file mode 100644 index 000000000..67afd3ce3 --- /dev/null +++ b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestAttributeExtractor.java @@ -0,0 +1,73 @@ +package io.a2a.transport.rest.handler; + +import static io.a2a.transport.rest.context.RestContextKeys.METHOD_NAME_KEY; + +import io.a2a.server.ServerCallContext; +import io.a2a.server.interceptors.InvocationContext; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class RestAttributeExtractor implements Supplier>> { + + @Override + public Function> get() { + return ctx -> { + String method = ctx.method().getName(); + Object[] parameters = ctx.parameters(); + + switch (method) { + case "sendMessage", + "sendStreamingMessage"-> { + ServerCallContext context = (ServerCallContext) parameters[1]; + Map result = new HashMap<>(Map.of("gen_ai.agent.a2a.request", (String) parameters[0])); + result.putAll(processServerCallContext(context)); + return result; + } + case "setTaskPushNotificationConfiguration" -> { + ServerCallContext context = (ServerCallContext) parameters[2]; + Map result = new HashMap<>(Map.of("gen_ai.agent.a2a.taskId", (String) parameters[0], "gen_ai.agent.a2a.request", (String) parameters[1])); + result.putAll(processServerCallContext(context)); + return result; + } + case "cancelTask", + "resubscribeTask", + "listTaskPushNotificationConfigurations" -> { + ServerCallContext context = (ServerCallContext) parameters[1]; + Map result = new HashMap<>(Map.of("gen_ai.agent.a2a.taskId", (String) parameters[0])); + result.putAll(processServerCallContext(context)); + return result; + } + case "getTask" -> { + ServerCallContext context = (ServerCallContext) parameters[2]; + Map result = new HashMap<>(Map.of("gen_ai.agent.a2a.taskId", (String) parameters[0], "gen_ai.agent.a2a.historyLength", "" + (int) parameters[1])); + result.putAll(processServerCallContext(context)); + return result; + } + case "getTaskPushNotificationConfiguration", + "deleteTaskPushNotificationConfiguration" -> { + ServerCallContext context = (ServerCallContext) parameters[2]; + Map result = new HashMap<>(Map.of("gen_ai.agent.a2a.taskId", (String) parameters[0], "gen_ai.agent.a2a.configId", (String) parameters[1])); + result.putAll(processServerCallContext(context)); + return result; + } + case "listTasks" -> { + ServerCallContext context = (ServerCallContext) parameters[6]; + Map result = new HashMap<>(Map.of("gen_ai.agent.a2a.contextId", (String) parameters[0], "gen_ai.agent.a2a.status", (String) parameters[1])); + result.putAll(processServerCallContext(context)); + return result; + } + default -> { + return Collections.emptyMap(); + } + } + }; + } + + private Map processServerCallContext(ServerCallContext context) { + return Map.of( "gen_ai.agent.a2a.extensions", context.getActivatedExtensions().stream().collect(Collectors.joining(",")), "gen_ai.agent.a2a.operation.name", (String) context.getState().get(METHOD_NAME_KEY)); + } +} diff --git a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java index 105831262..bfad39f7c 100644 --- a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java +++ b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java @@ -1,5 +1,6 @@ package io.a2a.transport.rest.handler; +import static io.a2a.server.interceptors.Kind.SERVER; import static io.a2a.server.util.async.AsyncUtils.createTubeConfig; import com.fasterxml.jackson.core.JacksonException; @@ -20,6 +21,7 @@ import io.a2a.server.PublicAgentCard; import io.a2a.server.ServerCallContext; +import io.a2a.server.interceptors.Trace; import io.a2a.server.requesthandlers.RequestHandler; import io.a2a.spec.AgentCard; import io.a2a.spec.AuthenticatedExtendedCardNotConfiguredError; @@ -62,8 +64,7 @@ public class RestHandler { private static final Logger log = Logger.getLogger(RestHandler.class.getName()); private AgentCard agentCard; - private @Nullable - Instance extendedAgentCard; + private @Nullable Instance extendedAgentCard; private RequestHandler requestHandler; private final Executor executor; @@ -91,6 +92,7 @@ public RestHandler(AgentCard agentCard, RequestHandler requestHandler, Executor this.executor = executor; } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse sendMessage(String body, ServerCallContext context) { try { io.a2a.grpc.SendMessageRequest.Builder request = io.a2a.grpc.SendMessageRequest.newBuilder(); @@ -104,6 +106,7 @@ public HTTPRestResponse sendMessage(String body, ServerCallContext context) { } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse sendStreamingMessage(String body, ServerCallContext context) { try { if (!agentCard.capabilities().streaming()) { @@ -120,6 +123,7 @@ public HTTPRestResponse sendStreamingMessage(String body, ServerCallContext cont } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse cancelTask(String taskId, ServerCallContext context) { try { if (taskId == null || taskId.isEmpty()) { @@ -138,6 +142,7 @@ public HTTPRestResponse cancelTask(String taskId, ServerCallContext context) { } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse setTaskPushNotificationConfiguration(String taskId, String body, ServerCallContext context) { try { if (!agentCard.capabilities().pushNotifications()) { @@ -154,6 +159,7 @@ public HTTPRestResponse setTaskPushNotificationConfiguration(String taskId, Stri } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse resubscribeTask(String taskId, ServerCallContext context) { try { if (!agentCard.capabilities().streaming()) { @@ -169,6 +175,7 @@ public HTTPRestResponse resubscribeTask(String taskId, ServerCallContext context } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse getTask(String taskId, int historyLength, ServerCallContext context) { try { TaskQueryParams params = new TaskQueryParams(taskId, historyLength); @@ -184,6 +191,7 @@ public HTTPRestResponse getTask(String taskId, int historyLength, ServerCallCont } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse listTasks(@Nullable String contextId, @Nullable String status, @Nullable Integer pageSize, @Nullable String pageToken, @Nullable Integer historyLength, @Nullable String lastUpdatedAfter, @@ -231,6 +239,7 @@ public HTTPRestResponse listTasks(@Nullable String contextId, @Nullable String s } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse getTaskPushNotificationConfiguration(String taskId, @Nullable String configId, ServerCallContext context) { try { if (!agentCard.capabilities().pushNotifications()) { @@ -246,6 +255,7 @@ public HTTPRestResponse getTaskPushNotificationConfiguration(String taskId, @Nul } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse listTaskPushNotificationConfigurations(String taskId, ServerCallContext context) { try { if (!agentCard.capabilities().pushNotifications()) { @@ -261,6 +271,7 @@ public HTTPRestResponse listTaskPushNotificationConfigurations(String taskId, Se } } + @Trace(extractor = RestAttributeExtractor.class, kind = SERVER) public HTTPRestResponse deleteTaskPushNotificationConfiguration(String taskId, String configId, ServerCallContext context) { try { if (!agentCard.capabilities().pushNotifications()) {