From ca328112aefa917cf7c29a6b3a567302d2a7ec66 Mon Sep 17 00:00:00 2001 From: Milan Savic Date: Wed, 25 Feb 2026 15:49:46 +0100 Subject: [PATCH 1/3] Added support for upcasting journal messages while applying them to the state machine. --- .../definition/ServiceDefinition.java | 42 +- .../dev/restate/sdk/upcasting/Upcaster.java | 71 ++++ .../sdk/upcasting/UpcasterFactory.java | 31 ++ .../sdk/core/EndpointRequestHandler.java | 11 +- .../statemachine/InvocationInputUpcaster.java | 29 ++ .../sdk/core/statemachine/StateMachine.java | 6 +- .../core/statemachine/StateMachineImpl.java | 12 +- .../DefaultInvocationInputUpcaster.java | 391 ++++++++++++++++++ .../DefaultInvocationInputUpcasterTest.java | 205 +++++++++ .../statemachine/upcasting/UpcasterTest.java | 140 +++++++ 10 files changed, 921 insertions(+), 17 deletions(-) create mode 100644 sdk-common/src/main/java/dev/restate/sdk/upcasting/Upcaster.java create mode 100644 sdk-common/src/main/java/dev/restate/sdk/upcasting/UpcasterFactory.java create mode 100644 sdk-core/src/main/java/dev/restate/sdk/core/statemachine/InvocationInputUpcaster.java create mode 100644 sdk-core/src/main/java/dev/restate/sdk/core/statemachine/upcasting/DefaultInvocationInputUpcaster.java create mode 100644 sdk-core/src/test/java/dev/restate/sdk/core/statemachine/upcasting/DefaultInvocationInputUpcasterTest.java create mode 100644 sdk-core/src/test/java/dev/restate/sdk/core/statemachine/upcasting/UpcasterTest.java diff --git a/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/ServiceDefinition.java b/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/ServiceDefinition.java index 7eb3d35f4..403f71c2f 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/ServiceDefinition.java +++ b/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/ServiceDefinition.java @@ -8,6 +8,8 @@ // https://github.com/restatedev/sdk-java/blob/main/LICENSE package dev.restate.sdk.endpoint.definition; +import dev.restate.sdk.upcasting.Upcaster; +import dev.restate.sdk.upcasting.UpcasterFactory; import java.time.Duration; import java.util.*; import java.util.function.Consumer; @@ -30,6 +32,7 @@ public final class ServiceDefinition { private final @Nullable Boolean ingressPrivate; private final @Nullable Boolean enableLazyState; private final @Nullable InvocationRetryPolicy invocationRetryPolicy; + private final UpcasterFactory upcasterFactory; private ServiceDefinition( String serviceName, @@ -43,7 +46,8 @@ private ServiceDefinition( @Nullable Duration journalRetention, @Nullable Boolean ingressPrivate, @Nullable Boolean enableLazyState, - @Nullable InvocationRetryPolicy invocationRetryPolicy) { + @Nullable InvocationRetryPolicy invocationRetryPolicy, + UpcasterFactory upcasterFactory) { this.serviceName = serviceName; this.serviceType = serviceType; this.handlers = handlers; @@ -56,6 +60,7 @@ private ServiceDefinition( this.ingressPrivate = ingressPrivate; this.enableLazyState = enableLazyState; this.invocationRetryPolicy = invocationRetryPolicy; + this.upcasterFactory = upcasterFactory; } /** @@ -158,6 +163,10 @@ public Map getMetadata() { return invocationRetryPolicy; } + public UpcasterFactory getUpcasterFactory() { + return upcasterFactory; + } + public ServiceDefinition withDocumentation(@Nullable String documentation) { return new ServiceDefinition( serviceName, @@ -171,7 +180,8 @@ public ServiceDefinition withDocumentation(@Nullable String documentation) { journalRetention, ingressPrivate, enableLazyState, - invocationRetryPolicy); + invocationRetryPolicy, + upcasterFactory); } public ServiceDefinition withMetadata(Map metadata) { @@ -187,7 +197,8 @@ public ServiceDefinition withMetadata(Map metadata) { journalRetention, ingressPrivate, enableLazyState, - invocationRetryPolicy); + invocationRetryPolicy, + upcasterFactory); } /** @@ -205,7 +216,8 @@ public ServiceDefinition configure(Consumer configurator) { journalRetention, ingressPrivate, enableLazyState, - invocationRetryPolicy); + invocationRetryPolicy, + upcasterFactory); configurator.accept(configuratorObj); return new ServiceDefinition( serviceName, @@ -219,7 +231,8 @@ public ServiceDefinition configure(Consumer configurator) { configuratorObj.journalRetention, configuratorObj.ingressPrivate, configuratorObj.enableLazyState, - configuratorObj.invocationRetryPolicy); + configuratorObj.invocationRetryPolicy, + configuratorObj.upcasterFactory); } /** Configurator for a {@link ServiceDefinition}. */ @@ -235,6 +248,7 @@ public static final class Configurator { private @Nullable Boolean ingressPrivate; private @Nullable Boolean enableLazyState; private @Nullable InvocationRetryPolicy invocationRetryPolicy; + private UpcasterFactory upcasterFactory; private Configurator( Map> handlers, @@ -246,7 +260,8 @@ private Configurator( @Nullable Duration journalRetention, @Nullable Boolean ingressPrivate, @Nullable Boolean enableLazyState, - @Nullable InvocationRetryPolicy invocationRetryPolicy) { + @Nullable InvocationRetryPolicy invocationRetryPolicy, + UpcasterFactory upcasterFactory) { this.handlers = new HashMap<>(handlers); this.documentation = documentation; this.metadata = new HashMap<>(metadata); @@ -257,6 +272,7 @@ private Configurator( this.ingressPrivate = ingressPrivate; this.enableLazyState = enableLazyState; this.invocationRetryPolicy = invocationRetryPolicy; + this.upcasterFactory = upcasterFactory; } /** @@ -533,6 +549,11 @@ public Configurator configureHandler( handlers.computeIfPresent(handlerName, (k, v) -> v.configure(configurator)); return this; } + + public Configurator configureUpcasterFactory(UpcasterFactory upcasterFactory) { + this.upcasterFactory = upcasterFactory; + return this; + } } @Override @@ -549,7 +570,8 @@ && getServiceType() == that.getServiceType() && Objects.equals(journalRetention, that.journalRetention) && Objects.equals(ingressPrivate, that.ingressPrivate) && Objects.equals(enableLazyState, that.enableLazyState) - && Objects.equals(invocationRetryPolicy, that.invocationRetryPolicy); + && Objects.equals(invocationRetryPolicy, that.invocationRetryPolicy) + && Objects.equals(upcasterFactory, that.upcasterFactory); } @Override @@ -566,7 +588,8 @@ public int hashCode() { journalRetention, ingressPrivate, enableLazyState, - invocationRetryPolicy); + invocationRetryPolicy, + upcasterFactory); } public static ServiceDefinition of( @@ -584,6 +607,7 @@ public static ServiceDefinition of( null, null, null, - null); + null, + (n, type, metadata) -> Upcaster.noop()); } } diff --git a/sdk-common/src/main/java/dev/restate/sdk/upcasting/Upcaster.java b/sdk-common/src/main/java/dev/restate/sdk/upcasting/Upcaster.java new file mode 100644 index 000000000..d54c27f46 --- /dev/null +++ b/sdk-common/src/main/java/dev/restate/sdk/upcasting/Upcaster.java @@ -0,0 +1,71 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.upcasting; + +import dev.restate.common.Slice; +import java.util.Map; + +/** + * A component capable of transforming input payloads from an older representation to a newer one + * ("upcasting"). Upcasters can be plugged per service via {@code UpcasterFactory} and are invoked + * by the state machine when handling incoming protocol messages that carry value payloads. + * + *

Metadata may be provided to upcasters via headers, including the core message name and, where + * applicable, the state key. See {@link #CORE_MESSAGE_NAME_METADATA_KEY} and {@link + * #STATE_KEY_METADATA_KEY}. + * + * @author Milan Savic + */ +public interface Upcaster { + + /** Header key containing the simple name of the core protocol message being processed. */ + String CORE_MESSAGE_NAME_METADATA_KEY = "_CORE_MESSAGE_NAME"; + + /** Header key containing the state key when upcasting state entries (if applicable). */ + String STATE_KEY_METADATA_KEY = "_STATE_KEY"; + + /** + * Returns whether this upcaster is able to upcast the given payload given the provided headers. + * Implementations should be side‑effect free. + */ + boolean canUpcast(Slice body, Map headers); + + /** + * Performs the upcast transformation. Implementations may assume {@link #canUpcast(Slice, Map)} + * has been called and returned {@code true}. + * + * @param body the original payload to upcast + * @param headers metadata associated with the payload + * @return the transformed payload; must not be {@code null} + */ + Slice upcast(Slice body, Map headers); + + /** Returns a no‑op upcaster that never upcasts and always returns the original payload. */ + static Upcaster noop() { + return NoopUpcaster.INSTANCE; + } + + /** No‑op implementation of {@link Upcaster}. */ + final class NoopUpcaster implements Upcaster { + + static final NoopUpcaster INSTANCE = new NoopUpcaster(); + + private NoopUpcaster() {} + + @Override + public boolean canUpcast(Slice body, Map headers) { + return false; + } + + @Override + public Slice upcast(Slice body, Map headers) { + return body; + } + } +} diff --git a/sdk-common/src/main/java/dev/restate/sdk/upcasting/UpcasterFactory.java b/sdk-common/src/main/java/dev/restate/sdk/upcasting/UpcasterFactory.java new file mode 100644 index 000000000..52e446836 --- /dev/null +++ b/sdk-common/src/main/java/dev/restate/sdk/upcasting/UpcasterFactory.java @@ -0,0 +1,31 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.upcasting; + +import dev.restate.sdk.endpoint.definition.ServiceType; +import java.util.Map; + +/** + * Factory for producing {@link Upcaster} instances for a specific service. Implementations may + * inspect the service name, service type, and optional metadata to decide which upcaster to return. + * + * @author Milan Savic + */ +public interface UpcasterFactory { + + /** + * Creates a new {@link Upcaster} for the given service. + * + * @param serviceName the logical name of the service + * @param serviceType the type of the service (SERVICE, VIRTUAL_OBJECT, WORKFLOW) + * @param metadata optional metadata that can be used when selecting/configuring the upcaster + * @return a non-null {@link Upcaster} + */ + Upcaster newUpcaster(String serviceName, ServiceType serviceType, Map metadata); +} diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java b/sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java index 261b14a42..1cae52161 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java @@ -17,6 +17,7 @@ import dev.restate.sdk.endpoint.HeadersAccessor; import dev.restate.sdk.endpoint.definition.HandlerDefinition; import dev.restate.sdk.endpoint.definition.ServiceDefinition; +import dev.restate.sdk.upcasting.Upcaster; import io.opentelemetry.context.propagation.TextMapGetter; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -179,14 +180,18 @@ public RequestProcessor processorForRequest( loggingContextSetter.set(LoggingContextSetter.INVOCATION_ID_KEY, invocationIdHeader); } - // Instantiate state machine - StateMachine stateMachine = StateMachine.init(headersAccessor, loggingContextSetter); - // Resolve the service method definition ServiceDefinition svc = this.endpoint.resolveService(serviceName); if (svc == null) { throw ProtocolException.methodNotFound(serviceName, handlerName); } + + // Instantiate state machine + Upcaster upcaster = + svc.getUpcasterFactory() + .newUpcaster(svc.getServiceName(), svc.getServiceType(), svc.getMetadata()); + StateMachine stateMachine = StateMachine.init(headersAccessor, loggingContextSetter, upcaster); + HandlerDefinition handler = svc.getHandler(handlerName); if (handler == null) { throw ProtocolException.methodNotFound(serviceName, handlerName); diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/InvocationInputUpcaster.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/InvocationInputUpcaster.java new file mode 100644 index 000000000..288b9f720 --- /dev/null +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/InvocationInputUpcaster.java @@ -0,0 +1,29 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.core.statemachine; + +/** + * Strategy for upcasting an {@link InvocationInput} before it is processed by the state machine. + * Implementations may transform the payloads in the underlying protocol messages (e.g. to migrate + * from an older schema version to a newer one) while keeping the semantics intact. + * + * @author Milan Savic + */ +public interface InvocationInputUpcaster { + + /** + * Upcasts the given {@link InvocationInput}. Implementations should be tolerant of {@code null} + * inputs and return {@code null} in that case. If no change is required, the same input instance + * may be returned. + * + * @param input the original invocation input, may be {@code null} + * @return the upcasted invocation input, or the original input if no changes were applied + */ + InvocationInput upcast(InvocationInput input); +} diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java index 14d810c11..b2e6c1afb 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java @@ -13,6 +13,7 @@ import dev.restate.sdk.common.*; import dev.restate.sdk.core.EndpointRequestHandler; import dev.restate.sdk.endpoint.HeadersAccessor; +import dev.restate.sdk.upcasting.Upcaster; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -30,8 +31,9 @@ public interface StateMachine extends Flow.Processor { static StateMachine init( HeadersAccessor headersAccessor, - EndpointRequestHandler.LoggingContextSetter loggingContextSetter) { - return new StateMachineImpl(headersAccessor, loggingContextSetter); + EndpointRequestHandler.LoggingContextSetter loggingContextSetter, + Upcaster upcaster) { + return new StateMachineImpl(headersAccessor, loggingContextSetter, upcaster); } // --- Response metadata diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java index 4253a1ba0..db3280d92 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java @@ -18,7 +18,9 @@ import dev.restate.sdk.core.EndpointRequestHandler; import dev.restate.sdk.core.ProtocolException; import dev.restate.sdk.core.generated.protocol.Protocol; +import dev.restate.sdk.core.statemachine.upcasting.DefaultInvocationInputUpcaster; import dev.restate.sdk.endpoint.HeadersAccessor; +import dev.restate.sdk.upcasting.Upcaster; import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; @@ -48,9 +50,13 @@ class StateMachineImpl implements StateMachine { // State machine context private final StateContext stateContext; + private final InvocationInputUpcaster upcaster; + StateMachineImpl( HeadersAccessor headersAccessor, - EndpointRequestHandler.LoggingContextSetter loggingContextSetter) { + EndpointRequestHandler.LoggingContextSetter loggingContextSetter, + Upcaster upcaster) { + this.upcaster = new DefaultInvocationInputUpcaster(upcaster); String contentTypeHeader = headersAccessor.get(ServiceProtocol.CONTENT_TYPE); var serviceProtocolVersion = ServiceProtocol.parseServiceProtocolVersion(contentTypeHeader); @@ -128,7 +134,7 @@ public void onNext(Slice slice) { this.messageDecoder.offer(slice); boolean shouldTriggerInputListener = this.messageDecoder.isNextAvailable(); - InvocationInput invocationInput = this.messageDecoder.next(); + InvocationInput invocationInput = upcaster.upcast(this.messageDecoder.next()); while (invocationInput != null) { LOG.trace( "Received input message {} {}", @@ -139,7 +145,7 @@ public void onNext(Slice slice) { .getCurrentState() .onNewMessage(invocationInput, this.stateContext, this.waitForReadyFuture); - invocationInput = this.messageDecoder.next(); + invocationInput = upcaster.upcast(this.messageDecoder.next()); } if (shouldTriggerInputListener) { diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/upcasting/DefaultInvocationInputUpcaster.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/upcasting/DefaultInvocationInputUpcaster.java new file mode 100644 index 000000000..b205d09f0 --- /dev/null +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/upcasting/DefaultInvocationInputUpcaster.java @@ -0,0 +1,391 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.core.statemachine.upcasting; + +import static java.util.Collections.singletonMap; +import static java.util.stream.Collectors.toMap; + +import com.google.protobuf.ByteString; +import dev.restate.common.Slice; +import dev.restate.sdk.core.generated.protocol.Protocol; +import dev.restate.sdk.core.statemachine.InvocationInput; +import dev.restate.sdk.core.statemachine.InvocationInputUpcaster; +import dev.restate.sdk.upcasting.Upcaster; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Default implementation of {@link InvocationInputUpcaster} that applies an {@link Upcaster} to the + * payloads contained in core protocol messages of an {@link InvocationInput}. + * + *

This component inspects the concrete protocol message carried by the invocation input and, + * when a value payload is present, it invokes the configured {@link Upcaster} with metadata + * describing the core message type and, where applicable, the state key. If the upcaster indicates + * it can upcast, the payload is transformed and the message rebuilt with the new content. If any + * exception occurs during upcasting, the public {@link #upcast(InvocationInput)} method logs a + * warning and returns the original input unmodified. + * + * @author Milan Savic + */ +public class DefaultInvocationInputUpcaster implements InvocationInputUpcaster { + + private static final Logger LOG = LogManager.getLogger(DefaultInvocationInputUpcaster.class); + + private final Upcaster upcaster; + + public DefaultInvocationInputUpcaster(Upcaster upcaster) { + this.upcaster = upcaster; + } + + @Override + public InvocationInput upcast(InvocationInput invocationInput) { + try { + return doUpcast(invocationInput); + } catch (Exception e) { + LOG.warn("An error occurred while upcasting.", e); + return invocationInput; + } + } + + public InvocationInput doUpcast(InvocationInput invocationInput) { + if (invocationInput == null) { + return null; + } + if (invocationInput.message() instanceof Protocol.InputCommandMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() instanceof Protocol.RunCompletionNotificationMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() instanceof Protocol.NotificationTemplate msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() instanceof Protocol.OutputCommandMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() + instanceof Protocol.GetLazyStateCompletionNotificationMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() instanceof Protocol.SetStateCommandMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() instanceof Protocol.GetEagerStateCommandMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() + instanceof Protocol.GetPromiseCompletionNotificationMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() + instanceof Protocol.PeekPromiseCompletionNotificationMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() instanceof Protocol.CompletePromiseCommandMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() + instanceof Protocol.CallCompletionNotificationMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() instanceof Protocol.SendSignalCommandMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() + instanceof Protocol.AttachInvocationCompletionNotificationMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() + instanceof Protocol.GetInvocationOutputCompletionNotificationMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() instanceof Protocol.CompleteAwakeableCommandMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() instanceof Protocol.SignalNotificationMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() instanceof Protocol.StartMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } else if (invocationInput.message() instanceof Protocol.ProposeRunCompletionMessage msg) { + return InvocationInput.of(invocationInput.header(), upcast(msg)); + } + return invocationInput; + } + + private Protocol.InputCommandMessage upcast(Protocol.InputCommandMessage message) { + Map metadata = + message.getHeadersList().stream() + .collect(toMap(Protocol.Header::getKey, Protocol.Header::getKey)); + metadata.put( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.InputCommandMessage.class.getSimpleName()); + return message.toBuilder().setValue(upcast(message.getValue(), metadata)).build(); + } + + private Protocol.RunCompletionNotificationMessage upcast( + Protocol.RunCompletionNotificationMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.RunCompletionNotificationMessage.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.NotificationTemplate upcast(Protocol.NotificationTemplate message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.NotificationTemplate.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.OutputCommandMessage upcast(Protocol.OutputCommandMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.OutputCommandMessage.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.GetLazyStateCompletionNotificationMessage upcast( + Protocol.GetLazyStateCompletionNotificationMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.GetLazyStateCompletionNotificationMessage.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.SetStateCommandMessage upcast(Protocol.SetStateCommandMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.SetStateCommandMessage.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.GetEagerStateCommandMessage upcast( + Protocol.GetEagerStateCommandMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.GetEagerStateCommandMessage.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.GetPromiseCompletionNotificationMessage upcast( + Protocol.GetPromiseCompletionNotificationMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.GetPromiseCompletionNotificationMessage.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.PeekPromiseCompletionNotificationMessage upcast( + Protocol.PeekPromiseCompletionNotificationMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.PeekPromiseCompletionNotificationMessage.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.CompletePromiseCommandMessage upcast( + Protocol.CompletePromiseCommandMessage message) { + if (message.hasCompletionValue()) { + return message.toBuilder() + .setCompletionValue( + upcast( + message.getCompletionValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.CompletePromiseCommandMessage.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.CallCompletionNotificationMessage upcast( + Protocol.CallCompletionNotificationMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.CallCompletionNotificationMessage.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.SendSignalCommandMessage upcast(Protocol.SendSignalCommandMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.SendSignalCommandMessage.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.AttachInvocationCompletionNotificationMessage upcast( + Protocol.AttachInvocationCompletionNotificationMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.AttachInvocationCompletionNotificationMessage.class + .getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.GetInvocationOutputCompletionNotificationMessage upcast( + Protocol.GetInvocationOutputCompletionNotificationMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.GetInvocationOutputCompletionNotificationMessage.class + .getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.CompleteAwakeableCommandMessage upcast( + Protocol.CompleteAwakeableCommandMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.CompleteAwakeableCommandMessage.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.SignalNotificationMessage upcast(Protocol.SignalNotificationMessage message) { + if (message.hasValue()) { + return message.toBuilder() + .setValue( + upcast( + message.getValue(), + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.SignalNotificationMessage.class.getSimpleName()))) + .build(); + } + return message; + } + + private Protocol.StartMessage upcast(Protocol.StartMessage message) { + var upcastedStateEntries = message.getStateMapList().stream().map(this::upcast).toList(); + return message.toBuilder().clearStateMap().addAllStateMap(upcastedStateEntries).build(); + } + + private Protocol.ProposeRunCompletionMessage upcast( + Protocol.ProposeRunCompletionMessage message) { + if (message.hasValue()) { + Slice original = Slice.wrap(message.getValue().toByteArray()); + Slice upcastedValue = + upcast( + original, + singletonMap( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.ProposeRunCompletionMessage.class.getSimpleName())); + return message.toBuilder().setValue(ByteString.copyFrom(upcastedValue.toByteArray())).build(); + } + return message; + } + + private Protocol.StartMessage.StateEntry upcast(Protocol.StartMessage.StateEntry message) { + String key = new String(message.getKey().toByteArray()); + Map metadata = + Map.of( + Upcaster.CORE_MESSAGE_NAME_METADATA_KEY, + Protocol.StartMessage.class.getSimpleName(), + Upcaster.STATE_KEY_METADATA_KEY, + key); + Slice value = Slice.wrap(message.getValue().toByteArray()); + + Slice upcastedValue = upcast(value, metadata); + + return Protocol.StartMessage.StateEntry.newBuilder() + .setKey(message.getKey()) + .setValue(ByteString.copyFrom(upcastedValue.toByteArray())) + .build(); + } + + private Protocol.Value upcast(Protocol.Value message, Map metadata) { + Slice body = Slice.wrap(message.getContent().toByteArray()); + Slice upcastedSlice = upcast(body, metadata); + return Protocol.Value.newBuilder() + .setContent(ByteString.copyFrom(upcastedSlice.toByteArray())) + .build(); + } + + private Slice upcast(Slice message, Map metadata) { + if (upcaster.canUpcast(message, metadata)) { + return upcaster.upcast(message, metadata); + } + return message; + } +} diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/upcasting/DefaultInvocationInputUpcasterTest.java b/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/upcasting/DefaultInvocationInputUpcasterTest.java new file mode 100644 index 000000000..d231a9bf4 --- /dev/null +++ b/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/upcasting/DefaultInvocationInputUpcasterTest.java @@ -0,0 +1,205 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.core.statemachine.upcasting; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.protobuf.ByteString; +import dev.restate.common.Slice; +import dev.restate.sdk.core.generated.protocol.Protocol; +import dev.restate.sdk.core.statemachine.InvocationInput; +import dev.restate.sdk.core.statemachine.MessageHeader; +import dev.restate.sdk.upcasting.Upcaster; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link DefaultInvocationInputUpcaster}, ensuring payloads are upcast with proper + * metadata propagation and that failures are handled gracefully. + * + *

Author: Milan Savic + */ +class DefaultInvocationInputUpcasterTest { + + private static final byte[] ORIG = "orig".getBytes(StandardCharsets.UTF_8); + private static final byte[] UP = "up".getBytes(StandardCharsets.UTF_8); + + @Test + void upcastNullInputReturnsNull() { + var sut = new DefaultInvocationInputUpcaster(Upcaster.noop()); + assertThat(sut.upcast(null)).isNull(); + } + + @Test + void inputCommandMessageValueIsUpcastAndMetadataContainsCoreNameAndHeaders() { + CapturingUpcaster cap = new CapturingUpcaster(true, UP); + var sut = new DefaultInvocationInputUpcaster(cap); + + Protocol.InputCommandMessage msg = + Protocol.InputCommandMessage.newBuilder() + .addHeaders(Protocol.Header.newBuilder().setKey("X-Key").setValue("ignored").build()) + .setName("entry") + .setValue(valueOf(ORIG)) + .build(); + InvocationInput input = InvocationInput.of(MessageHeader.fromMessage(msg), msg); + + InvocationInput out = sut.doUpcast(input); + + assertThat(out.message()).isInstanceOf(Protocol.InputCommandMessage.class); + Protocol.InputCommandMessage outMsg = (Protocol.InputCommandMessage) out.message(); + assertThat(outMsg.getValue().getContent().toByteArray()).isEqualTo(UP); + + // Metadata captured by the Upcaster should include the CORE_MESSAGE_NAME and carried header key + assertThat(cap.lastHeaders.get()).isNotNull(); + Map meta = cap.lastHeaders.get(); + assertThat(meta.get(Upcaster.CORE_MESSAGE_NAME_METADATA_KEY)) + .isEqualTo(Protocol.InputCommandMessage.class.getSimpleName()); + assertThat(meta).containsKey("X-Key"); + // body passed to the upcaster should be the raw value content + assertThat(cap.lastBody.get()).isNotNull(); + assertThat(cap.lastBody.get().toByteArray()).isEqualTo(ORIG); + } + + @Test + void runCompletionNotificationValueIsUpcastAndCoreNamePresent() { + CapturingUpcaster cap = new CapturingUpcaster(true, UP); + var sut = new DefaultInvocationInputUpcaster(cap); + + Protocol.RunCompletionNotificationMessage msg = + Protocol.RunCompletionNotificationMessage.newBuilder() + .setCompletionId(1) + .setValue(valueOf(ORIG)) + .build(); + InvocationInput input = InvocationInput.of(MessageHeader.fromMessage(msg), msg); + + Protocol.RunCompletionNotificationMessage out = + (Protocol.RunCompletionNotificationMessage) sut.doUpcast(input).message(); + assertThat(out.getValue().getContent().toByteArray()).isEqualTo(UP); + assertThat(cap.lastHeaders.get().get(Upcaster.CORE_MESSAGE_NAME_METADATA_KEY)) + .isEqualTo(Protocol.RunCompletionNotificationMessage.class.getSimpleName()); + } + + @Test + void startMessageStateEntriesAreUpcastAndStateKeyMetadataIsPropagated() { + CapturingUpcaster cap = new CapturingUpcaster(true, UP); + var sut = new DefaultInvocationInputUpcaster(cap); + + Protocol.StartMessage.StateEntry stateEntry = + Protocol.StartMessage.StateEntry.newBuilder() + .setKey(ByteString.copyFrom("balance".getBytes(StandardCharsets.UTF_8))) + .setValue(ByteString.copyFrom(ORIG)) + .build(); + Protocol.StartMessage msg = Protocol.StartMessage.newBuilder().addStateMap(stateEntry).build(); + InvocationInput input = InvocationInput.of(MessageHeader.fromMessage(msg), msg); + + Protocol.StartMessage out = (Protocol.StartMessage) sut.doUpcast(input).message(); + assertThat(out.getStateMap(0).getValue().toByteArray()).isEqualTo(UP); + assertThat(cap.lastHeaders.get().get(Upcaster.CORE_MESSAGE_NAME_METADATA_KEY)) + .isEqualTo(Protocol.StartMessage.class.getSimpleName()); + assertThat(cap.lastHeaders.get().get(Upcaster.STATE_KEY_METADATA_KEY)).isEqualTo("balance"); + assertThat(cap.lastBody.get().toByteArray()).isEqualTo(ORIG); + } + + @Test + void proposeRunCompletionValueIsUpcastFromRawBytes() { + CapturingUpcaster cap = new CapturingUpcaster(true, UP); + var sut = new DefaultInvocationInputUpcaster(cap); + + Protocol.ProposeRunCompletionMessage msg = + Protocol.ProposeRunCompletionMessage.newBuilder() + .setValue(ByteString.copyFrom(ORIG)) + .build(); + InvocationInput input = InvocationInput.of(MessageHeader.fromMessage(msg), msg); + + Protocol.ProposeRunCompletionMessage out = + (Protocol.ProposeRunCompletionMessage) sut.doUpcast(input).message(); + assertThat(out.getValue().toByteArray()).isEqualTo(UP); + assertThat(cap.lastHeaders.get().get(Upcaster.CORE_MESSAGE_NAME_METADATA_KEY)) + .isEqualTo(Protocol.ProposeRunCompletionMessage.class.getSimpleName()); + } + + @Test + void messageWithoutValueIsLeftUntouched() { + CapturingUpcaster cap = new CapturingUpcaster(true, UP); + var sut = new DefaultInvocationInputUpcaster(cap); + + // OutputCommandMessage with failure but no value should not be transformed + Protocol.OutputCommandMessage msg = + Protocol.OutputCommandMessage.newBuilder() + .setName("entry") + .setFailure(Protocol.Failure.newBuilder().setCode(42).setMessage("boom").build()) + .build(); + InvocationInput input = InvocationInput.of(MessageHeader.fromMessage(msg), msg); + + InvocationInput out = sut.doUpcast(input); + assertThat(out.message()).isInstanceOf(Protocol.OutputCommandMessage.class); + Protocol.OutputCommandMessage outMsg = (Protocol.OutputCommandMessage) out.message(); + assertThat(outMsg.hasValue()).isFalse(); + assertThat(cap.lastBody.get()).isNull(); // upcaster should not have been invoked + } + + @Test + void publicUpcastCatchesExceptionsAndReturnsOriginalInput() { + Upcaster throwing = + new Upcaster() { + @Override + public boolean canUpcast(Slice body, Map headers) { + return true; + } + + @Override + public Slice upcast(Slice body, Map headers) { + throw new RuntimeException("boom"); + } + }; + var sut = new DefaultInvocationInputUpcaster(throwing); + + Protocol.RunCompletionNotificationMessage msg = + Protocol.RunCompletionNotificationMessage.newBuilder() + .setCompletionId(1) + .setValue(valueOf(ORIG)) + .build(); + InvocationInput input = InvocationInput.of(MessageHeader.fromMessage(msg), msg); + + InvocationInput out = sut.upcast(input); // use the safe wrapper + assertThat(out).isSameAs(input); // should fall back to original input + } + + private static Protocol.Value valueOf(byte[] content) { + return Protocol.Value.newBuilder().setContent(ByteString.copyFrom(content)).build(); + } + + private static final class CapturingUpcaster implements Upcaster { + private final boolean answer; + private final byte[] replyBytes; + private final AtomicReference lastBody = new AtomicReference<>(); + private final AtomicReference> lastHeaders = new AtomicReference<>(); + + private CapturingUpcaster(boolean answer, byte[] replyBytes) { + this.answer = answer; + this.replyBytes = replyBytes; + } + + @Override + public boolean canUpcast(Slice body, Map headers) { + lastBody.set(body); + lastHeaders.set(headers); + return answer; + } + + @Override + public Slice upcast(Slice body, Map headers) { + lastBody.set(body); + lastHeaders.set(headers); + return Slice.wrap(replyBytes); + } + } +} diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/upcasting/UpcasterTest.java b/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/upcasting/UpcasterTest.java new file mode 100644 index 000000000..c8576b6c1 --- /dev/null +++ b/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/upcasting/UpcasterTest.java @@ -0,0 +1,140 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.core.statemachine.upcasting; + +import static dev.restate.sdk.core.statemachine.ProtoUtils.END_MESSAGE; +import static dev.restate.sdk.core.statemachine.ProtoUtils.inputCmd; +import static dev.restate.sdk.core.statemachine.ProtoUtils.outputCmd; +import static dev.restate.sdk.core.statemachine.ProtoUtils.startMessage; + +import dev.restate.common.Slice; +import dev.restate.common.function.ThrowingBiFunction; +import dev.restate.sdk.Context; +import dev.restate.sdk.HandlerRunner; +import dev.restate.sdk.ObjectContext; +import dev.restate.sdk.WorkflowContext; +import dev.restate.sdk.core.MockRequestResponse; +import dev.restate.sdk.core.TestDefinitions; +import dev.restate.sdk.core.TestSerdes; +import dev.restate.sdk.endpoint.definition.HandlerDefinition; +import dev.restate.sdk.endpoint.definition.HandlerType; +import dev.restate.sdk.endpoint.definition.ServiceDefinition; +import dev.restate.sdk.endpoint.definition.ServiceType; +import dev.restate.sdk.upcasting.Upcaster; +import dev.restate.sdk.upcasting.UpcasterFactory; +import dev.restate.serde.jackson.JacksonSerdeFactory; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +/** + * Integration-level tests verifying that configured {@link dev.restate.sdk.upcasting.Upcaster} + * instances are applied to incoming inputs across different service types (Service, Virtual Object, + * Workflow) using the state machine test harness. + * + * @author Milan Savic + */ +public class UpcasterTest { + + private static final String INPUT_ORIG = "orig-value"; + private static final String INPUT_UPCASTED = "upcasted-value"; + + private static UpcasterFactory simpleReplacingUpcasterFactory() { + return new UpcasterFactory() { + @Override + public Upcaster newUpcaster( + String serviceName, ServiceType serviceType, Map metadata) { + return new Upcaster() { + @Override + public boolean canUpcast(Slice body, Map headers) { + // Upcast only if the input contains the marker "orig-value" + return new String(body.toByteArray(), StandardCharsets.UTF_8).contains(INPUT_ORIG); + } + + @Override + public Slice upcast(Slice body, Map headers) { + String s = new String(body.toByteArray(), StandardCharsets.UTF_8); + s = s.replace(INPUT_ORIG, INPUT_UPCASTED); + return Slice.wrap(s); + } + }; + } + }; + } + + private static ServiceDefinition buildService( + String name, + ServiceType type, + HandlerType handlerType, + ThrowingBiFunction runner) { + @SuppressWarnings("unchecked") + HandlerDefinition handler = + HandlerDefinition.of( + "run", + handlerType, + TestSerdes.STRING, + TestSerdes.STRING, + HandlerRunner.of((ThrowingBiFunction) runner, new JacksonSerdeFactory(), null)); + + return ServiceDefinition.of(name, type, List.of(handler)) + .configure(cfg -> cfg.configureUpcasterFactory(simpleReplacingUpcasterFactory())); + } + + @Test + void serviceUpcasterIsAppliedToInput() { + // Echo handler for Service + ServiceDefinition svc = + buildService( + "UpcastService", + ServiceType.SERVICE, + HandlerType.SHARED, + (ThrowingBiFunction) (ctx, in) -> in); + + TestDefinitions.ExpectingOutputMessages def = + TestDefinitions.testInvocation(svc, null, "run") + .withInput(startMessage(1), inputCmd(INPUT_ORIG)) + .expectingOutput(outputCmd(INPUT_UPCASTED), END_MESSAGE); + MockRequestResponse.INSTANCE.executeTest(def); + } + + @Test + void virtualObjectUpcasterIsAppliedToInput() { + // Echo handler for Virtual Object + ServiceDefinition svc = + buildService( + "UpcastVirtualObject", + ServiceType.VIRTUAL_OBJECT, + HandlerType.EXCLUSIVE, + (ThrowingBiFunction) (ctx, in) -> in); + + TestDefinitions.ExpectingOutputMessages def = + TestDefinitions.testInvocation(svc, null, "run") + .withInput(startMessage(1, "my-key"), inputCmd(INPUT_ORIG)) + .expectingOutput(outputCmd(INPUT_UPCASTED), END_MESSAGE); + MockRequestResponse.INSTANCE.executeTest(def); + } + + @Test + void workflowUpcasterIsAppliedToInput() { + // Echo handler for Workflow + ServiceDefinition svc = + buildService( + "UpcastWorkflow", + ServiceType.WORKFLOW, + HandlerType.WORKFLOW, + (ThrowingBiFunction) (ctx, in) -> in); + + TestDefinitions.ExpectingOutputMessages def = + TestDefinitions.testInvocation(svc, null, "run") + .withInput(startMessage(1), inputCmd(INPUT_ORIG)) + .expectingOutput(outputCmd(INPUT_UPCASTED), END_MESSAGE); + MockRequestResponse.INSTANCE.executeTest(def); + } +} From 7780bb1f0e442a81b00bfc70b72f54522b5c2073 Mon Sep 17 00:00:00 2001 From: Milan Savic Date: Wed, 25 Feb 2026 15:57:58 +0100 Subject: [PATCH 2/3] Added javadoc to ServiceDefinition Configurator. --- .../sdk/endpoint/definition/ServiceDefinition.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/ServiceDefinition.java b/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/ServiceDefinition.java index 403f71c2f..838498b83 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/ServiceDefinition.java +++ b/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/ServiceDefinition.java @@ -163,6 +163,10 @@ public Map getMetadata() { return invocationRetryPolicy; } + /** + * @return Upcaster factory used for upcasting journal messages while applying them to the state machine + * @see Configurator#upcasterFactory + */ public UpcasterFactory getUpcasterFactory() { return upcasterFactory; } @@ -550,6 +554,16 @@ public Configurator configureHandler( return this; } + /** + * Configure upcaster factory for this service. Upcaster factory is used to create a fresh upcaster for each + * invocation. The upcaster is then used to upcast (transform) journal messages. The upcaster is mostly used for + * transforming messages in older format to the newest format. Could also be used for transformations of Virtual + * Objects state. + * + * @return this + * @see UpcasterFactory + * @see Upcaster + */ public Configurator configureUpcasterFactory(UpcasterFactory upcasterFactory) { this.upcasterFactory = upcasterFactory; return this; From bddbc579e8cdbbcc4635572ae1ca3a49748c1511 Mon Sep 17 00:00:00 2001 From: Milan Savic Date: Wed, 25 Feb 2026 16:01:03 +0100 Subject: [PATCH 3/3] Fixed typo in @author --- .../upcasting/DefaultInvocationInputUpcasterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/upcasting/DefaultInvocationInputUpcasterTest.java b/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/upcasting/DefaultInvocationInputUpcasterTest.java index d231a9bf4..e18e6dd7b 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/upcasting/DefaultInvocationInputUpcasterTest.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/statemachine/upcasting/DefaultInvocationInputUpcasterTest.java @@ -25,7 +25,7 @@ * Tests for {@link DefaultInvocationInputUpcaster}, ensuring payloads are upcast with proper * metadata propagation and that failures are handled gracefully. * - *

Author: Milan Savic + * @author Milan Savic */ class DefaultInvocationInputUpcasterTest {