Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeDuration;
import static com.google.api.gax.util.TimeConversionUtils.toThreetenDuration;
import static com.google.cloud.spanner.spi.v1.GapicSpannerRpc.EXPERIMENTAL_LOCATION_API_ENV_VAR;

import com.google.api.core.ApiFunction;
import com.google.api.core.BetaApi;
Expand Down Expand Up @@ -257,6 +258,7 @@ public static GcpChannelPoolOptions createDefaultDynamicChannelPoolOptions() {
private final OpenTelemetry openTelemetry;
private final boolean enableApiTracing;
private final boolean enableBuiltInMetrics;
private final boolean enableLocationApi;
private final boolean enableExtendedTracing;
private final boolean enableEndToEndTracing;
private final String monitoringHost;
Expand Down Expand Up @@ -926,6 +928,7 @@ protected SpannerOptions(Builder builder) {
} else {
enableBuiltInMetrics = builder.enableBuiltInMetrics;
}
enableLocationApi = builder.enableLocationApi;
enableEndToEndTracing = builder.enableEndToEndTracing;
monitoringHost = builder.monitoringHost;
defaultTransactionOptions = builder.defaultTransactionOptions;
Expand Down Expand Up @@ -993,6 +996,10 @@ default boolean isEnableEndToEndTracing() {
return false;
}

default boolean isEnableLocationApi() {
return false;
}

@Deprecated
@ObsoleteApi(
"This will be removed in an upcoming version without a major version bump. You should use"
Expand Down Expand Up @@ -1084,6 +1091,11 @@ public boolean isEnableEndToEndTracing() {
return Boolean.parseBoolean(System.getenv(SPANNER_ENABLE_END_TO_END_TRACING));
}

@Override
public boolean isEnableLocationApi() {
return Boolean.parseBoolean(System.getenv(EXPERIMENTAL_LOCATION_API_ENV_VAR));
}

@Override
public String getMonitoringHost() {
return System.getenv(SPANNER_MONITORING_HOST);
Expand Down Expand Up @@ -1164,6 +1176,7 @@ public static class Builder
private boolean enableExtendedTracing = SpannerOptions.environment.isEnableExtendedTracing();
private boolean enableEndToEndTracing = SpannerOptions.environment.isEnableEndToEndTracing();
private boolean enableBuiltInMetrics = SpannerOptions.environment.isEnableBuiltInMetrics();
private boolean enableLocationApi = SpannerOptions.environment.isEnableLocationApi();
private String monitoringHost = SpannerOptions.environment.getMonitoringHost();
private SslContext mTLSContext = null;
private String experimentalHost = null;
Expand Down Expand Up @@ -1270,6 +1283,7 @@ protected Builder() {
this.enableApiTracing = options.enableApiTracing;
this.enableExtendedTracing = options.enableExtendedTracing;
this.enableBuiltInMetrics = options.enableBuiltInMetrics;
this.enableLocationApi = options.enableLocationApi;
this.enableEndToEndTracing = options.enableEndToEndTracing;
this.monitoringHost = options.monitoringHost;
this.defaultTransactionOptions = options.defaultTransactionOptions;
Expand Down Expand Up @@ -2434,6 +2448,11 @@ public boolean isEnableBuiltInMetrics() {
return enableBuiltInMetrics;
}

@InternalApi
public boolean isEnableLocationApi() {
return enableLocationApi;
}

/** Returns the override metrics Host. */
String getMonitoringHost() {
return monitoringHost;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@
public class GapicSpannerRpc implements SpannerRpc {
private static final PathTemplate PROJECT_NAME_TEMPLATE =
PathTemplate.create("projects/{project}");
private static final String EXPERIMENTAL_LOCATION_API_ENV_VAR =
public static final String EXPERIMENTAL_LOCATION_API_ENV_VAR =
"GOOGLE_SPANNER_EXPERIMENTAL_LOCATION_API";
private static final PathTemplate OPERATION_NAME_TEMPLATE =
PathTemplate.create("{database=projects/*/instances/*/databases/*}/operations/{operation}");
Expand Down Expand Up @@ -399,8 +399,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
// If it is enabled in options uses the channel pool provided by the gRPC-GCP extension.
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);

boolean enableLocationApi =
Boolean.parseBoolean(System.getenv(EXPERIMENTAL_LOCATION_API_ENV_VAR));
boolean enableLocationApi = options.isEnableLocationApi();
TransportChannelProvider baseChannelProvider =
MoreObjects.firstNonNull(
options.getChannelProvider(), defaultChannelProviderBuilder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ static final class KeyAwareClientCall<RequestT, ResponseT>
@Nullable private ChannelEndpoint selectedEndpoint;
@Nullable private ByteString transactionIdToClear;
private boolean allowDefaultAffinity;
private long pendingRequests;
private boolean pendingHalfClose;
@Nullable private Boolean pendingMessageCompression;
@Nullable private io.grpc.Status cancelledStatus;
@Nullable private Metadata cancelledTrailers;
private final Object lock = new Object();

KeyAwareClientCall(
KeyAwareChannel parentChannel,
Expand All @@ -257,95 +263,215 @@ static final class KeyAwareClientCall<RequestT, ResponseT>

@Override
protected ClientCall<RequestT, ResponseT> delegate() {
if (delegate == null) {
throw new IllegalStateException(
"Delegate call not initialized before use. sendMessage was likely not called.");
synchronized (lock) {
if (delegate == null) {
throw new IllegalStateException(
"Delegate call not initialized before use. sendMessage was likely not called.");
}
return delegate;
}
return delegate;
}

@Override
public void start(Listener<ResponseT> responseListener, Metadata headers) {
this.responseListener = new KeyAwareClientCallListener<>(responseListener, this);
this.headers = headers;
Listener<ResponseT> listenerToClose = null;
io.grpc.Status statusToClose = null;
Metadata trailersToClose = null;
synchronized (lock) {
this.responseListener = new KeyAwareClientCallListener<>(responseListener, this);
this.headers = headers;
if (this.cancelledStatus != null) {
listenerToClose = this.responseListener;
statusToClose = this.cancelledStatus;
trailersToClose =
this.cancelledTrailers == null ? new Metadata() : this.cancelledTrailers;
}
}
if (listenerToClose != null) {
listenerToClose.onClose(statusToClose, trailersToClose);
}
}

@Override
@SuppressWarnings("unchecked")
public void sendMessage(RequestT message) {
ChannelEndpoint endpoint = null;
ChannelFinder finder = null;

if (message instanceof ReadRequest) {
ReadRequest.Builder reqBuilder = ((ReadRequest) message).toBuilder();
RoutingDecision routing = routeFromRequest(reqBuilder);
finder = routing.finder;
endpoint = routing.endpoint;
message = (RequestT) reqBuilder.build();
} else if (message instanceof ExecuteSqlRequest) {
ExecuteSqlRequest.Builder reqBuilder = ((ExecuteSqlRequest) message).toBuilder();
RoutingDecision routing = routeFromRequest(reqBuilder);
finder = routing.finder;
endpoint = routing.endpoint;
message = (RequestT) reqBuilder.build();
} else if (message instanceof BeginTransactionRequest) {
BeginTransactionRequest.Builder reqBuilder =
((BeginTransactionRequest) message).toBuilder();
String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession());
if (databaseId != null && reqBuilder.hasMutationKey()) {
finder = parentChannel.getOrCreateChannelFinder(databaseId);
ChannelEndpoint routed = finder.findServer(reqBuilder);
if (endpoint == null) {
endpoint = routed;
synchronized (lock) {
if (this.cancelledStatus != null) {
return;
}
if (responseListener == null || headers == null) {
throw new IllegalStateException("start must be called before sendMessage");
}
ChannelEndpoint endpoint = null;
ChannelFinder finder = null;

if (message instanceof ReadRequest) {
ReadRequest.Builder reqBuilder = ((ReadRequest) message).toBuilder();
RoutingDecision routing = routeFromRequest(reqBuilder);
finder = routing.finder;
endpoint = routing.endpoint;
message = (RequestT) reqBuilder.build();
} else if (message instanceof ExecuteSqlRequest) {
ExecuteSqlRequest.Builder reqBuilder = ((ExecuteSqlRequest) message).toBuilder();
RoutingDecision routing = routeFromRequest(reqBuilder);
finder = routing.finder;
endpoint = routing.endpoint;
message = (RequestT) reqBuilder.build();
} else if (message instanceof BeginTransactionRequest) {
BeginTransactionRequest.Builder reqBuilder =
((BeginTransactionRequest) message).toBuilder();
String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession());
if (databaseId != null && reqBuilder.hasMutationKey()) {
finder = parentChannel.getOrCreateChannelFinder(databaseId);
endpoint = finder.findServer(reqBuilder);
}
allowDefaultAffinity = true;
message = (RequestT) reqBuilder.build();
} else if (message instanceof CommitRequest) {
CommitRequest request = (CommitRequest) message;
if (!request.getTransactionId().isEmpty()) {
endpoint = parentChannel.affinityEndpoint(request.getTransactionId());
transactionIdToClear = request.getTransactionId();
}
} else if (message instanceof RollbackRequest) {
RollbackRequest request = (RollbackRequest) message;
if (!request.getTransactionId().isEmpty()) {
endpoint = parentChannel.affinityEndpoint(request.getTransactionId());
transactionIdToClear = request.getTransactionId();
}
} else {
throw new IllegalStateException(
"Only read, query, begin transaction, commit, and rollback requests are supported for"
+ " key-aware calls.");
}
allowDefaultAffinity = true;
message = (RequestT) reqBuilder.build();
} else if (message instanceof CommitRequest) {
CommitRequest request = (CommitRequest) message;
if (!request.getTransactionId().isEmpty()) {
endpoint = parentChannel.affinityEndpoint(request.getTransactionId());
transactionIdToClear = request.getTransactionId();

if (endpoint == null) {
endpoint = parentChannel.endpointCache.defaultChannel();
}
} else if (message instanceof RollbackRequest) {
RollbackRequest request = (RollbackRequest) message;
if (!request.getTransactionId().isEmpty()) {
endpoint = parentChannel.affinityEndpoint(request.getTransactionId());
transactionIdToClear = request.getTransactionId();
selectedEndpoint = endpoint;
this.channelFinder = finder;

delegate = endpoint.getChannel().newCall(methodDescriptor, callOptions);
if (pendingMessageCompression != null) {
delegate.setMessageCompression(pendingMessageCompression);
pendingMessageCompression = null;
}
delegate.start(responseListener, headers);
drainPendingRequests();
delegate.sendMessage(message);
if (pendingHalfClose) {
delegate.halfClose();
}
} else {
throw new IllegalStateException(
"Only read, query, begin transaction, commit, and rollback requests are supported for"
+ " key-aware calls.");
}
}

if (endpoint == null) {
endpoint = parentChannel.endpointCache.defaultChannel();
@Override
public void halfClose() {
ClientCall<RequestT, ResponseT> currentDelegate;
synchronized (lock) {
if (this.cancelledStatus != null) {
return;
}
if (delegate == null) {
pendingHalfClose = true;
return;
}
currentDelegate = delegate;
}
currentDelegate.halfClose();
}

@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
ClientCall<RequestT, ResponseT> currentDelegate;
Listener<ResponseT> listenerToClose = null;
io.grpc.Status statusToClose = null;
Metadata trailersToClose = null;
synchronized (lock) {
currentDelegate = delegate;
if (currentDelegate == null) {
cancelledStatus = io.grpc.Status.CANCELLED.withDescription(message).withCause(cause);
Metadata trailers =
cause == null ? new Metadata() : io.grpc.Status.trailersFromThrowable(cause);
cancelledTrailers = trailers == null ? new Metadata() : trailers;
if (responseListener != null) {
listenerToClose = responseListener;
statusToClose = cancelledStatus;
trailersToClose = cancelledTrailers;
}
}
}
selectedEndpoint = endpoint;
this.channelFinder = finder;
if (currentDelegate != null) {
currentDelegate.cancel(message, cause);
} else if (listenerToClose != null) {
listenerToClose.onClose(statusToClose, trailersToClose);
}
}

delegate = endpoint.getChannel().newCall(methodDescriptor, callOptions);
delegate.start(responseListener, headers);
delegate.sendMessage(message);
@Override
public void request(int numMessages) {
ClientCall<RequestT, ResponseT> currentDelegate;
synchronized (lock) {
if (cancelledStatus != null) {
return;
}
if (delegate != null) {
currentDelegate = delegate;
} else {
if (numMessages <= 0) {
return;
}
long updated = pendingRequests + numMessages;
if (updated < 0L) {
updated = Long.MAX_VALUE;
}
pendingRequests = updated;
return;
}
}
currentDelegate.request(numMessages);
}

@Override
public void halfClose() {
if (delegate != null) {
delegate.halfClose();
} else {
throw new IllegalStateException("halfClose called before sendMessage");
public boolean isReady() {
ClientCall<RequestT, ResponseT> currentDelegate;
synchronized (lock) {
currentDelegate = delegate;
}
if (currentDelegate == null) {
return false;
}
return currentDelegate.isReady();
}

@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
if (delegate != null) {
delegate.cancel(message, cause);
} else if (responseListener != null) {
responseListener.onClose(
io.grpc.Status.CANCELLED.withDescription(message).withCause(cause), new Metadata());
public void setMessageCompression(boolean enabled) {
ClientCall<RequestT, ResponseT> currentDelegate;
synchronized (lock) {
if (cancelledStatus != null) {
return;
}
if (delegate != null) {
currentDelegate = delegate;
} else {
pendingMessageCompression = enabled;
return;
}
}
currentDelegate.setMessageCompression(enabled);
}

private void drainPendingRequests() {
ClientCall<RequestT, ResponseT> currentDelegate = delegate;
if (currentDelegate == null) {
return;
}
long requests = pendingRequests;
pendingRequests = 0L;
while (requests > 0) {
int batch = requests > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requests;
currentDelegate.request(batch);
requests -= batch;
}
}

Expand Down Expand Up @@ -419,6 +545,9 @@ public void onMessage(ResponseT message) {
transactionId = transactionIdFromMetadata(response);
} else if (message instanceof ResultSet) {
ResultSet response = (ResultSet) message;
if (response.hasCacheUpdate() && call.channelFinder != null) {
call.channelFinder.update(response.getCacheUpdate());
}
transactionId = transactionIdFromMetadata(response);
} else if (message instanceof Transaction) {
Transaction response = (Transaction) message;
Expand Down
Loading
Loading