Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.endpoint.definition;

import dev.restate.sdk.upcasting.Upcaster;
import dev.restate.sdk.upcasting.UpcasterFactory;
import java.time.Duration;
import java.util.*;
import java.util.function.Consumer;
Expand All @@ -30,6 +32,7 @@ public final class ServiceDefinition {
private final @Nullable Boolean ingressPrivate;
private final @Nullable Boolean enableLazyState;
private final @Nullable InvocationRetryPolicy invocationRetryPolicy;
private final UpcasterFactory upcasterFactory;

private ServiceDefinition(
String serviceName,
Expand All @@ -43,7 +46,8 @@ private ServiceDefinition(
@Nullable Duration journalRetention,
@Nullable Boolean ingressPrivate,
@Nullable Boolean enableLazyState,
@Nullable InvocationRetryPolicy invocationRetryPolicy) {
@Nullable InvocationRetryPolicy invocationRetryPolicy,
UpcasterFactory upcasterFactory) {
this.serviceName = serviceName;
this.serviceType = serviceType;
this.handlers = handlers;
Expand All @@ -56,6 +60,7 @@ private ServiceDefinition(
this.ingressPrivate = ingressPrivate;
this.enableLazyState = enableLazyState;
this.invocationRetryPolicy = invocationRetryPolicy;
this.upcasterFactory = upcasterFactory;
}

/**
Expand Down Expand Up @@ -158,6 +163,14 @@ public Map<String, String> getMetadata() {
return invocationRetryPolicy;
}

/**
* @return Upcaster factory used for upcasting journal messages while applying them to the state machine
* @see Configurator#upcasterFactory
*/
public UpcasterFactory getUpcasterFactory() {
return upcasterFactory;
}

public ServiceDefinition withDocumentation(@Nullable String documentation) {
return new ServiceDefinition(
serviceName,
Expand All @@ -171,7 +184,8 @@ public ServiceDefinition withDocumentation(@Nullable String documentation) {
journalRetention,
ingressPrivate,
enableLazyState,
invocationRetryPolicy);
invocationRetryPolicy,
upcasterFactory);
}

public ServiceDefinition withMetadata(Map<String, String> metadata) {
Expand All @@ -187,7 +201,8 @@ public ServiceDefinition withMetadata(Map<String, String> metadata) {
journalRetention,
ingressPrivate,
enableLazyState,
invocationRetryPolicy);
invocationRetryPolicy,
upcasterFactory);
}

/**
Expand All @@ -205,7 +220,8 @@ public ServiceDefinition configure(Consumer<Configurator> configurator) {
journalRetention,
ingressPrivate,
enableLazyState,
invocationRetryPolicy);
invocationRetryPolicy,
upcasterFactory);
configurator.accept(configuratorObj);
return new ServiceDefinition(
serviceName,
Expand All @@ -219,7 +235,8 @@ public ServiceDefinition configure(Consumer<Configurator> configurator) {
configuratorObj.journalRetention,
configuratorObj.ingressPrivate,
configuratorObj.enableLazyState,
configuratorObj.invocationRetryPolicy);
configuratorObj.invocationRetryPolicy,
configuratorObj.upcasterFactory);
}

/** Configurator for a {@link ServiceDefinition}. */
Expand All @@ -235,6 +252,7 @@ public static final class Configurator {
private @Nullable Boolean ingressPrivate;
private @Nullable Boolean enableLazyState;
private @Nullable InvocationRetryPolicy invocationRetryPolicy;
private UpcasterFactory upcasterFactory;

private Configurator(
Map<String, HandlerDefinition<?, ?>> handlers,
Expand All @@ -246,7 +264,8 @@ private Configurator(
@Nullable Duration journalRetention,
@Nullable Boolean ingressPrivate,
@Nullable Boolean enableLazyState,
@Nullable InvocationRetryPolicy invocationRetryPolicy) {
@Nullable InvocationRetryPolicy invocationRetryPolicy,
UpcasterFactory upcasterFactory) {
this.handlers = new HashMap<>(handlers);
this.documentation = documentation;
this.metadata = new HashMap<>(metadata);
Expand All @@ -257,6 +276,7 @@ private Configurator(
this.ingressPrivate = ingressPrivate;
this.enableLazyState = enableLazyState;
this.invocationRetryPolicy = invocationRetryPolicy;
this.upcasterFactory = upcasterFactory;
}

/**
Expand Down Expand Up @@ -533,6 +553,21 @@ public Configurator configureHandler(
handlers.computeIfPresent(handlerName, (k, v) -> v.configure(configurator));
return this;
}

/**
* Configure upcaster factory for this service. Upcaster factory is used to create a fresh upcaster for each
* invocation. The upcaster is then used to upcast (transform) journal messages. The upcaster is mostly used for
* transforming messages in older format to the newest format. Could also be used for transformations of Virtual
* Objects state.
*
* @return this
* @see UpcasterFactory
* @see Upcaster
*/
public Configurator configureUpcasterFactory(UpcasterFactory upcasterFactory) {
this.upcasterFactory = upcasterFactory;
return this;
}
}

@Override
Expand All @@ -549,7 +584,8 @@ && getServiceType() == that.getServiceType()
&& Objects.equals(journalRetention, that.journalRetention)
&& Objects.equals(ingressPrivate, that.ingressPrivate)
&& Objects.equals(enableLazyState, that.enableLazyState)
&& Objects.equals(invocationRetryPolicy, that.invocationRetryPolicy);
&& Objects.equals(invocationRetryPolicy, that.invocationRetryPolicy)
&& Objects.equals(upcasterFactory, that.upcasterFactory);
}

@Override
Expand All @@ -566,7 +602,8 @@ public int hashCode() {
journalRetention,
ingressPrivate,
enableLazyState,
invocationRetryPolicy);
invocationRetryPolicy,
upcasterFactory);
}

public static ServiceDefinition of(
Expand All @@ -584,6 +621,7 @@ public static ServiceDefinition of(
null,
null,
null,
null);
null,
(n, type, metadata) -> Upcaster.noop());
}
}
71 changes: 71 additions & 0 deletions sdk-common/src/main/java/dev/restate/sdk/upcasting/Upcaster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.upcasting;

import dev.restate.common.Slice;
import java.util.Map;

/**
* A component capable of transforming input payloads from an older representation to a newer one
* ("upcasting"). Upcasters can be plugged per service via {@code UpcasterFactory} and are invoked
* by the state machine when handling incoming protocol messages that carry value payloads.
*
* <p>Metadata may be provided to upcasters via headers, including the core message name and, where
* applicable, the state key. See {@link #CORE_MESSAGE_NAME_METADATA_KEY} and {@link
* #STATE_KEY_METADATA_KEY}.
*
* @author Milan Savic
*/
public interface Upcaster {

/** Header key containing the simple name of the core protocol message being processed. */
String CORE_MESSAGE_NAME_METADATA_KEY = "_CORE_MESSAGE_NAME";

/** Header key containing the state key when upcasting state entries (if applicable). */
String STATE_KEY_METADATA_KEY = "_STATE_KEY";

/**
* Returns whether this upcaster is able to upcast the given payload given the provided headers.
* Implementations should be side‑effect free.
*/
boolean canUpcast(Slice body, Map<String, String> headers);

/**
* Performs the upcast transformation. Implementations may assume {@link #canUpcast(Slice, Map)}
* has been called and returned {@code true}.
*
* @param body the original payload to upcast
* @param headers metadata associated with the payload
* @return the transformed payload; must not be {@code null}
*/
Slice upcast(Slice body, Map<String, String> headers);

/** Returns a no‑op upcaster that never upcasts and always returns the original payload. */
static Upcaster noop() {
return NoopUpcaster.INSTANCE;
}

/** No‑op implementation of {@link Upcaster}. */
final class NoopUpcaster implements Upcaster {

static final NoopUpcaster INSTANCE = new NoopUpcaster();

private NoopUpcaster() {}

@Override
public boolean canUpcast(Slice body, Map<String, String> headers) {
return false;
}

@Override
public Slice upcast(Slice body, Map<String, String> headers) {
return body;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.upcasting;

import dev.restate.sdk.endpoint.definition.ServiceType;
import java.util.Map;

/**
* Factory for producing {@link Upcaster} instances for a specific service. Implementations may
* inspect the service name, service type, and optional metadata to decide which upcaster to return.
*
* @author Milan Savic
*/
public interface UpcasterFactory {

/**
* Creates a new {@link Upcaster} for the given service.
*
* @param serviceName the logical name of the service
* @param serviceType the type of the service (SERVICE, VIRTUAL_OBJECT, WORKFLOW)
* @param metadata optional metadata that can be used when selecting/configuring the upcaster
* @return a non-null {@link Upcaster}
*/
Upcaster newUpcaster(String serviceName, ServiceType serviceType, Map<String, String> metadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import dev.restate.sdk.endpoint.HeadersAccessor;
import dev.restate.sdk.endpoint.definition.HandlerDefinition;
import dev.restate.sdk.endpoint.definition.ServiceDefinition;
import dev.restate.sdk.upcasting.Upcaster;
import io.opentelemetry.context.propagation.TextMapGetter;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -179,14 +180,18 @@ public RequestProcessor processorForRequest(
loggingContextSetter.set(LoggingContextSetter.INVOCATION_ID_KEY, invocationIdHeader);
}

// Instantiate state machine
StateMachine stateMachine = StateMachine.init(headersAccessor, loggingContextSetter);

// Resolve the service method definition
ServiceDefinition svc = this.endpoint.resolveService(serviceName);
if (svc == null) {
throw ProtocolException.methodNotFound(serviceName, handlerName);
}

// Instantiate state machine
Upcaster upcaster =
svc.getUpcasterFactory()
.newUpcaster(svc.getServiceName(), svc.getServiceType(), svc.getMetadata());
StateMachine stateMachine = StateMachine.init(headersAccessor, loggingContextSetter, upcaster);

HandlerDefinition<?, ?> handler = svc.getHandler(handlerName);
if (handler == null) {
throw ProtocolException.methodNotFound(serviceName, handlerName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.core.statemachine;

/**
* Strategy for upcasting an {@link InvocationInput} before it is processed by the state machine.
* Implementations may transform the payloads in the underlying protocol messages (e.g. to migrate
* from an older schema version to a newer one) while keeping the semantics intact.
*
* @author Milan Savic
*/
public interface InvocationInputUpcaster {

/**
* Upcasts the given {@link InvocationInput}. Implementations should be tolerant of {@code null}
* inputs and return {@code null} in that case. If no change is required, the same input instance
* may be returned.
*
* @param input the original invocation input, may be {@code null}
* @return the upcasted invocation input, or the original input if no changes were applied
*/
InvocationInput upcast(InvocationInput input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import dev.restate.sdk.common.*;
import dev.restate.sdk.core.EndpointRequestHandler;
import dev.restate.sdk.endpoint.HeadersAccessor;
import dev.restate.sdk.upcasting.Upcaster;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
Expand All @@ -30,8 +31,9 @@ public interface StateMachine extends Flow.Processor<Slice, Slice> {

static StateMachine init(
HeadersAccessor headersAccessor,
EndpointRequestHandler.LoggingContextSetter loggingContextSetter) {
return new StateMachineImpl(headersAccessor, loggingContextSetter);
EndpointRequestHandler.LoggingContextSetter loggingContextSetter,
Upcaster upcaster) {
return new StateMachineImpl(headersAccessor, loggingContextSetter, upcaster);
}

// --- Response metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import dev.restate.sdk.core.EndpointRequestHandler;
import dev.restate.sdk.core.ProtocolException;
import dev.restate.sdk.core.generated.protocol.Protocol;
import dev.restate.sdk.core.statemachine.upcasting.DefaultInvocationInputUpcaster;
import dev.restate.sdk.endpoint.HeadersAccessor;
import dev.restate.sdk.upcasting.Upcaster;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -48,9 +50,13 @@ class StateMachineImpl implements StateMachine {
// State machine context
private final StateContext stateContext;

private final InvocationInputUpcaster upcaster;

StateMachineImpl(
HeadersAccessor headersAccessor,
EndpointRequestHandler.LoggingContextSetter loggingContextSetter) {
EndpointRequestHandler.LoggingContextSetter loggingContextSetter,
Upcaster upcaster) {
this.upcaster = new DefaultInvocationInputUpcaster(upcaster);
String contentTypeHeader = headersAccessor.get(ServiceProtocol.CONTENT_TYPE);

var serviceProtocolVersion = ServiceProtocol.parseServiceProtocolVersion(contentTypeHeader);
Expand Down Expand Up @@ -128,7 +134,7 @@ public void onNext(Slice slice) {
this.messageDecoder.offer(slice);

boolean shouldTriggerInputListener = this.messageDecoder.isNextAvailable();
InvocationInput invocationInput = this.messageDecoder.next();
InvocationInput invocationInput = upcaster.upcast(this.messageDecoder.next());
while (invocationInput != null) {
LOG.trace(
"Received input message {} {}",
Expand All @@ -139,7 +145,7 @@ public void onNext(Slice slice) {
.getCurrentState()
.onNewMessage(invocationInput, this.stateContext, this.waitForReadyFuture);

invocationInput = this.messageDecoder.next();
invocationInput = upcaster.upcast(this.messageDecoder.next());
}

if (shouldTriggerInputListener) {
Expand Down
Loading