From cc9cdf0181d23827218e94ee02e6c1beb5196cf2 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Wed, 11 Feb 2026 14:22:59 -0800 Subject: [PATCH 1/7] Add EnvoyRateLimiter Implementation --- .../beam/gradle/BeamModulePlugin.groovy | 1 + sdks/java/io/components/build.gradle | 5 + .../ratelimiter/EnvoyRateLimiter.java | 46 ++++ .../ratelimiter/EnvoyRateLimiterContext.java | 65 +++++ .../ratelimiter/EnvoyRateLimiterFactory.java | 238 ++++++++++++++++++ .../ratelimiter/RateLimiterClientCache.java | 94 +++++++ .../ratelimiter/RateLimiterOptions.java | 68 +++++ .../ratelimiter/EnvoyRateLimiterTest.java | 168 +++++++++++++ .../RateLimiterClientCacheTest.java | 115 +++++++++ .../ratelimiter/RateLimiterOptionsTest.java | 81 ++++++ 10 files changed, 881 insertions(+) create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java create mode 100644 sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java create mode 100644 sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java create mode 100644 sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 774674b545ed..753c2417f50f 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -726,6 +726,7 @@ class BeamModulePlugin implements Plugin { commons_logging : "commons-logging:commons-logging:1.2", commons_math3 : "org.apache.commons:commons-math3:3.6.1", dbcp2 : "org.apache.commons:commons-dbcp2:$dbcp2_version", + envoy_control_plane_api : "io.envoyproxy.controlplane:api:1.0.49", error_prone_annotations : "com.google.errorprone:error_prone_annotations:$errorprone_version", failsafe : "dev.failsafe:failsafe:3.3.0", flogger_system_backend : "com.google.flogger:flogger-system-backend:0.7.4", diff --git a/sdks/java/io/components/build.gradle b/sdks/java/io/components/build.gradle index 25bf95772110..973420098348 100644 --- a/sdks/java/io/components/build.gradle +++ b/sdks/java/io/components/build.gradle @@ -26,6 +26,11 @@ ext.summary = "Components for building fully featured IOs" dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.auto_value_annotations + implementation library.java.envoy_control_plane_api + implementation library.java.grpc_api + implementation library.java.grpc_stub + implementation library.java.grpc_protobuf implementation library.java.protobuf_java permitUnusedDeclared library.java.protobuf_java // BEAM-11761 implementation library.java.slf4j_api diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java new file mode 100644 index 000000000000..9fc3da80dca4 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import java.io.IOException; + +/** + * A lightweight handle for an Envoy-based rate limiter. + * + *

Delegates work to the {@link EnvoyRateLimiterFactory} using the baked-in {@link + * EnvoyRateLimiterContext}. + */ +public class EnvoyRateLimiter implements RateLimiter { + private final EnvoyRateLimiterFactory factory; + private final EnvoyRateLimiterContext context; + + public EnvoyRateLimiter(EnvoyRateLimiterFactory factory, EnvoyRateLimiterContext context) { + this.factory = factory; + this.context = context; + } + + @Override + public boolean allow(int permits) throws IOException, InterruptedException { + return factory.allow(context, permits); + } + + @Override + public void close() throws Exception { + factory.close(); + } +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java new file mode 100644 index 000000000000..baebece7962b --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * Context for an Envoy Rate Limiter call. + * + *

Contains the domain and descriptors required to define a specific rate limit bucket. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class EnvoyRateLimiterContext implements RateLimiterContext { + + @SchemaFieldDescription("Domain of the rate limiter.") + public abstract String getDomain(); + + @SchemaFieldDescription("Descriptors for the rate limiter.") + public abstract ImmutableMap getDescriptors(); + + public static Builder builder() { + return new AutoValue_EnvoyRateLimiterContext.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setDomain(@NonNull String domain); + + public abstract ImmutableMap.Builder descriptorsBuilder(); + + public Builder addDescriptor(@NonNull String key, @NonNull String value) { + descriptorsBuilder().put(key, value); + return this; + } + + public Builder setDescriptors(@NonNull Map descriptors) { + descriptorsBuilder().putAll(descriptors); + return this; + } + + public abstract EnvoyRateLimiterContext build(); + } +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java new file mode 100644 index 000000000000..e4a7adaa606b --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import io.envoyproxy.envoy.extensions.common.ratelimit.v3.RateLimitDescriptor; +import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitRequest; +import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitResponse; +import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitServiceGrpc; +import io.grpc.StatusRuntimeException; +import java.io.IOException; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.components.throttling.ThrottlingSignaler; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A {@link RateLimiterFactory} for Envoy Rate Limit Service. */ +public class EnvoyRateLimiterFactory implements RateLimiterFactory { + private static final Logger LOG = LoggerFactory.getLogger(EnvoyRateLimiterFactory.class); + private static final int RPC_RETRY_COUNT = 3; + private static final long RPC_RETRY_DELAY_MILLIS = 5000; + + private final RateLimiterOptions options; + + private transient volatile @Nullable RateLimitServiceGrpc.RateLimitServiceBlockingStub stub; + private transient @Nullable RateLimiterClientCache clientCache; + private final ThrottlingSignaler throttlingSignaler; + private final Sleeper sleeper; + + private final Counter requestsTotal; + private final Counter requestsAllowed; + private final Counter requestsThrottled; + private final Counter rpcErrors; + private final Counter rpcRetries; + private final Distribution rpcLatency; + + public EnvoyRateLimiterFactory(RateLimiterOptions options) { + this(options, Sleeper.DEFAULT); + } + + @VisibleForTesting + EnvoyRateLimiterFactory(RateLimiterOptions options, Sleeper sleeper) { + this.options = options; + this.sleeper = sleeper; + String namespace = EnvoyRateLimiterFactory.class.getName(); + this.throttlingSignaler = new ThrottlingSignaler(namespace); + this.requestsTotal = Metrics.counter(namespace, "ratelimit-requests-total"); + this.requestsAllowed = Metrics.counter(namespace, "ratelimit-requests-allowed"); + this.requestsThrottled = Metrics.counter(namespace, "ratelimit-requests-throttled"); + this.rpcErrors = Metrics.counter(namespace, "ratelimit-rpc-errors"); + this.rpcRetries = Metrics.counter(namespace, "ratelimit-rpc-retries"); + this.rpcLatency = Metrics.distribution(namespace, "ratelimit-rpc-latency-ms"); + } + + @Override + public synchronized void close() { + if (clientCache != null) { + clientCache.release(); + clientCache = null; + } + stub = null; + } + + private void init() { + if (stub != null) { + return; + } + synchronized (this) { + if (stub == null) { + RateLimiterClientCache cache = RateLimiterClientCache.getOrCreate(options.getAddress()); + this.clientCache = cache; + stub = RateLimitServiceGrpc.newBlockingStub(cache.getChannel()); + } + } + } + + @VisibleForTesting + void setStub(RateLimitServiceGrpc.RateLimitServiceBlockingStub stub) { + this.stub = stub; + } + + @Override + public RateLimiter getLimiter(RateLimiterContext context) { + if (!(context instanceof EnvoyRateLimiterContext)) { + throw new IllegalArgumentException( + "EnvoyRateLimiterFactory requires EnvoyRateLimiterContext"); + } + return new EnvoyRateLimiter(this, (EnvoyRateLimiterContext) context); + } + + @Override + public boolean allow(RateLimiterContext context, int permits) + throws IOException, InterruptedException { + if (permits == 0) { + return true; + } + if (!(context instanceof EnvoyRateLimiterContext)) { + throw new IllegalArgumentException( + "EnvoyRateLimiterFactory requires EnvoyRateLimiterContext, got: " + + context.getClass().getName()); + } + checkArgument(permits >= 0, "Permits must be non-negative"); + EnvoyRateLimiterContext envoyContext = (EnvoyRateLimiterContext) context; + return fetchTokens(envoyContext, permits); + } + + private boolean fetchTokens(EnvoyRateLimiterContext context, int tokens) + throws IOException, InterruptedException { + + init(); + RateLimitServiceGrpc.RateLimitServiceBlockingStub currentStub = stub; + if (currentStub == null) { + throw new IllegalStateException("RateLimitServiceStub is null"); + } + + Map descriptors = context.getDescriptors(); + RateLimitDescriptor.Builder descriptorBuilder = RateLimitDescriptor.newBuilder(); + + for (Map.Entry entry : descriptors.entrySet()) { + descriptorBuilder.addEntries( + RateLimitDescriptor.Entry.newBuilder() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + .build()); + } + + RateLimitRequest request = + RateLimitRequest.newBuilder() + .setDomain(context.getDomain()) + .setHitsAddend(tokens) + .addDescriptors(descriptorBuilder.build()) + .build(); + + Integer maxRetries = options.getMaxRetries(); + long timeoutMillis = options.getTimeout().toMillis(); + + requestsTotal.inc(); + int attempt = 0; + while (true) { + if (maxRetries != null && attempt > maxRetries) { + return false; + } + + // RPC Retry Loop + RateLimitResponse response = null; + long startTime = System.currentTimeMillis(); + for (int i = 0; i < RPC_RETRY_COUNT; i++) { + try { + response = + currentStub + .withDeadlineAfter(timeoutMillis, java.util.concurrent.TimeUnit.MILLISECONDS) + .shouldRateLimit(request); + long endTime = System.currentTimeMillis(); + rpcLatency.update(endTime - startTime); + break; + } catch (StatusRuntimeException e) { + rpcErrors.inc(); + if (i == RPC_RETRY_COUNT - 1) { + LOG.error("RateLimitService call failed after {} attempts", RPC_RETRY_COUNT, e); + throw new IOException("Failed to call Rate Limit Service", e); + } + rpcRetries.inc(); + LOG.warn("RateLimitService call failed, retrying", e); + if (sleeper != null) { + sleeper.sleep(RPC_RETRY_DELAY_MILLIS); + } + } + } + + if (response == null) { + throw new IOException("Failed to get response from Rate Limit Service"); + } + + if (response.getOverallCode() == RateLimitResponse.Code.OK) { + requestsAllowed.inc(); + return true; + } else if (response.getOverallCode() == RateLimitResponse.Code.OVER_LIMIT) { + long sleepMillis = 0; + for (RateLimitResponse.DescriptorStatus status : response.getStatusesList()) { + if (status.getCode() == RateLimitResponse.Code.OVER_LIMIT + && status.hasDurationUntilReset()) { + long durationMillis = + status.getDurationUntilReset().getSeconds() * 1000 + + status.getDurationUntilReset().getNanos() / 1_000_000; + if (durationMillis > sleepMillis) { + sleepMillis = durationMillis; + } + } + } + + if (sleepMillis == 0) { + sleepMillis = 1000; + } + + long jitter = + (long) + (java.util.concurrent.ThreadLocalRandom.current().nextDouble() + * (0.01 * sleepMillis)); + sleepMillis += jitter; + + LOG.warn("Throttled by RLS, sleeping for {} ms", sleepMillis); + if (sleeper != null) { + requestsThrottled.inc(); + if (throttlingSignaler != null) { + throttlingSignaler.signalThrottling(sleepMillis); + } + sleeper.sleep(sleepMillis); + } + attempt++; + } else { + throw new IOException( + "Rate Limit Service returned unknown code: " + response.getOverallCode()); + } + } + } +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java new file mode 100644 index 000000000000..1e5fd6321a58 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A static cache for {@link ManagedChannel}s to Rate Limit Service. + * + *

This class ensures that multiple DoFn instances (threads) in the same Worker sharing the same + * RLS address will share a single {@link ManagedChannel}. + * + *

It uses reference counting to close the channel when it is no longer in use by any RateLimiter + * instance. + */ +public class RateLimiterClientCache { + private static final Logger LOG = LoggerFactory.getLogger(RateLimiterClientCache.class); + private static final Map CACHE = new ConcurrentHashMap<>(); + + private final ManagedChannel channel; + private final String address; + private int refCount = 0; + + private RateLimiterClientCache(String address) { + this.address = address; + LOG.info("Creating new ManagedChannel for RLS at {}", address); + this.channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build(); + } + + /** + * Gets or creates a cached client for the given address. Increments the reference count. + * Synchronized on the class to prevent race conditions when multiple instances call getOrCreate() + * simultaneously + */ + public static synchronized RateLimiterClientCache getOrCreate(String address) { + RateLimiterClientCache client = CACHE.get(address); + if (client == null) { + client = new RateLimiterClientCache(address); + CACHE.put(address, client); + } + client.refCount++; + LOG.debug("Referenced RLS Channel for {}. New RefCount: {}", address, client.refCount); + return client; + } + + public ManagedChannel getChannel() { + return channel; + } + + /** + * Releases the client. Decrements the reference count. If reference count reaches 0, the channel + * is shut down and removed from the cache. Synchronized on the class to prevent race conditions + * when multiple threads call release() simultaneously and to prevent race conditions between + * getOrCreate() and release() calls. + */ + public void release() { + synchronized (RateLimiterClientCache.class) { + refCount--; + LOG.debug("Released RLS Channel for {}. New RefCount: {}", address, refCount); + if (refCount <= 0) { + LOG.info("Closing ManagedChannel for RLS at {}", address); + CACHE.remove(address); + channel.shutdown(); + try { + channel.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.error("Couldn't gracefully close gRPC channel={}", channel, e); + } + channel.shutdownNow(); + } + } + } +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java new file mode 100644 index 000000000000..acf49622eb96 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.time.Duration; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; + +/** Configuration options for {@link RateLimiterFactory}. */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class RateLimiterOptions implements Serializable { + @SchemaFieldDescription("Address of the rate limiter") + public abstract String getAddress(); + + @Nullable + @SchemaFieldDescription("Maximum number of retries, defaults to infinite") + public abstract Integer getMaxRetries(); + + @SchemaFieldDescription("Timeout for rate limiter operations, defaults to 5 seconds") + public abstract Duration getTimeout(); + + public static Builder builder() { + return new AutoValue_RateLimiterOptions.Builder().setTimeout(Duration.ofSeconds(5)); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setAddress(String address); + + public abstract Builder setMaxRetries(Integer maxRetries); + + public abstract Builder setTimeout(Duration timeout); + + abstract RateLimiterOptions autoBuild(); + + public RateLimiterOptions build() { + RateLimiterOptions options = autoBuild(); + checkArgument(options.getTimeout().compareTo(Duration.ZERO) > 0, "Timeout must be positive"); + Integer maxRetries = options.getMaxRetries(); + if (maxRetries != null) { + checkArgument(maxRetries >= 0, "MaxRetries must be non-negative"); + } + return options; + } + } +} diff --git a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java new file mode 100644 index 000000000000..e94d0b42eb3c --- /dev/null +++ b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.verify; + +import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitRequest; +import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitResponse; +import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitServiceGrpc; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import org.apache.beam.sdk.util.Sleeper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link EnvoyRateLimiterFactory}. */ +@RunWith(JUnit4.class) +public class EnvoyRateLimiterTest { + @Mock private Sleeper sleeper; + + private EnvoyRateLimiterFactory factory; + private RateLimiterOptions options; + private EnvoyRateLimiterContext context; + + private Server server; + private ManagedChannel channel; + private TestRateLimitService service; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this); + options = + RateLimiterOptions.builder() + .setAddress("localhost:8081") + .setTimeout(java.time.Duration.ofSeconds(1)) + .build(); + + String serverName = InProcessServerBuilder.generateName(); + service = new TestRateLimitService(); + server = + InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(service) + .build() + .start(); + channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + + factory = new EnvoyRateLimiterFactory(options, sleeper); + factory.setStub(RateLimitServiceGrpc.newBlockingStub(channel)); + + context = + EnvoyRateLimiterContext.builder() + .setDomain("test-domain") + .addDescriptor("key", "value") + .build(); + } + + @After + public void tearDown() { + if (channel != null) { + channel.shutdownNow(); + } + if (server != null) { + server.shutdownNow(); + } + } + + @Test + public void testAllow_OK() throws Exception { + service.responseToReturn = + RateLimitResponse.newBuilder().setOverallCode(RateLimitResponse.Code.OK).build(); + + assertTrue(factory.allow(context, 1)); + } + + @Test + public void testAllow_OverLimit() throws Exception { + service.responseToReturn = + RateLimitResponse.newBuilder() + .setOverallCode(RateLimitResponse.Code.OVER_LIMIT) + .addStatuses( + RateLimitResponse.DescriptorStatus.newBuilder() + .setCode(RateLimitResponse.Code.OVER_LIMIT) + .setDurationUntilReset( + com.google.protobuf.Duration.newBuilder().setSeconds(1).build()) + .build()) + .build(); + + factory = + new EnvoyRateLimiterFactory( + RateLimiterOptions.builder() + .setAddress("foo") + .setTimeout(java.time.Duration.ofSeconds(1)) + .setMaxRetries(1) + .build(), + sleeper); + factory.setStub(RateLimitServiceGrpc.newBlockingStub(channel)); + + assertFalse(factory.allow(context, 1)); + + // Verify sleep was called. + verify(sleeper, org.mockito.Mockito.atLeastOnce()).sleep(anyLong()); + } + + @Test + public void testAllow_RpcError() throws Exception { + service.errorToThrow = Status.UNAVAILABLE.asRuntimeException(); + assertThrows(IOException.class, () -> factory.allow(context, 1)); + } + + @Test + public void testInvalidContext() { + assertThrows( + IllegalArgumentException.class, () -> factory.allow(new RateLimiterContext() {}, 1)); + } + + static class TestRateLimitService extends RateLimitServiceGrpc.RateLimitServiceImplBase { + volatile RateLimitResponse responseToReturn; + volatile RuntimeException errorToThrow; + + @Override + public void shouldRateLimit( + RateLimitRequest request, StreamObserver responseObserver) { + if (errorToThrow != null) { + responseObserver.onError(errorToThrow); + return; + } + if (responseToReturn != null) { + responseObserver.onNext(responseToReturn); + responseObserver.onCompleted(); + } else { + // Default OK + responseObserver.onNext( + RateLimitResponse.newBuilder().setOverallCode(RateLimitResponse.Code.OK).build()); + responseObserver.onCompleted(); + } + } + } +} diff --git a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java new file mode 100644 index 000000000000..4eb61b279c34 --- /dev/null +++ b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link RateLimiterClientCache}. */ +@RunWith(JUnit4.class) +public class RateLimiterClientCacheTest { + + @Test + public void testGetOrCreate_SameAddress() { + String address = "addr1"; + RateLimiterClientCache client1 = RateLimiterClientCache.getOrCreate(address); + RateLimiterClientCache client2 = RateLimiterClientCache.getOrCreate(address); + + assertSame(client1, client2); + assertFalse(client1.getChannel().isShutdown()); + + // cleanup + client1.release(); + // client2 is still using the same channel + assertFalse(client1.getChannel().isShutdown()); + client2.release(); + assertTrue(client1.getChannel().isShutdown()); + } + + @Test + public void testGetOrCreate_DifferentAddress_ReturnsDifferentInstances() { + RateLimiterClientCache client1 = RateLimiterClientCache.getOrCreate("addr1"); + RateLimiterClientCache client2 = RateLimiterClientCache.getOrCreate("addr2"); + + assertNotSame(client1, client2); + + assertFalse(client1.getChannel().isShutdown()); + assertFalse(client2.getChannel().isShutdown()); + client1.release(); + assertTrue(client1.getChannel().isShutdown()); + client2.release(); + assertTrue(client2.getChannel().isShutdown()); + } + + @Test + public void testConcurrency() throws InterruptedException, ExecutionException { + int threads = 10; + int iterations = 100; + String address = "concurrent-addr"; + ExecutorService pool = Executors.newFixedThreadPool(threads); + List> futures = new ArrayList<>(); + + for (int i = 0; i < threads; i++) { + futures.add( + pool.submit( + new Callable() { + @Override + public Boolean call() { + for (int j = 0; j < iterations; j++) { + RateLimiterClientCache client = RateLimiterClientCache.getOrCreate(address); + // do some tiny work + try { + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + client.release(); + } + return true; + } + })); + } + + for (Future f : futures) { + assertTrue(f.get()); + } + + pool.shutdown(); + pool.awaitTermination(5, TimeUnit.SECONDS); + + // After all threads are done, cache should be empty or create new one cleanly + RateLimiterClientCache client = RateLimiterClientCache.getOrCreate(address); + assertFalse(client.getChannel().isShutdown()); + client.release(); + assertTrue(client.getChannel().isShutdown()); + } +} diff --git a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java new file mode 100644 index 000000000000..cb8674b4e502 --- /dev/null +++ b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link RateLimiterOptions}. */ +@RunWith(JUnit4.class) +public class RateLimiterOptionsTest { + + @Test + public void testValidOptions() { + RateLimiterOptions options = + RateLimiterOptions.builder() + .setAddress("localhost:8081") + .setTimeout(Duration.ofSeconds(1)) + .setMaxRetries(3) + .build(); + + assertEquals("localhost:8081", options.getAddress()); + assertEquals(Duration.ofSeconds(1), options.getTimeout()); + assertEquals(Integer.valueOf(3), options.getMaxRetries()); + } + + @Test + public void testNegativeTimeout() { + assertThrows( + IllegalArgumentException.class, + () -> + RateLimiterOptions.builder() + .setAddress("localhost:8081") + .setTimeout(Duration.ofSeconds(-1)) + .build()); + } + + @Test + public void testZeroTimeout() { + assertThrows( + IllegalArgumentException.class, + () -> + RateLimiterOptions.builder() + .setAddress("localhost:8081") + .setTimeout(Duration.ZERO) + .build()); + } + + @Test + public void testNegativeMaxRetries() { + assertThrows( + IllegalArgumentException.class, + () -> RateLimiterOptions.builder().setAddress("localhost:8081").setMaxRetries(-1).build()); + } + + @Test + public void testNullMaxRetriesIsAllowed() { + RateLimiterOptions options = + RateLimiterOptions.builder().setAddress("localhost:8081").setMaxRetries(null).build(); + assertEquals(null, options.getMaxRetries()); + } +} From 68a8209b1a5945aa2c7381f149a8ad40ed8c8459 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Wed, 11 Feb 2026 15:10:26 -0800 Subject: [PATCH 2/7] Add example --- examples/java/build.gradle | 1 + .../beam/examples/RateLimiterSimple.java | 131 ++++++++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 5334538cc09f..068c0d1b56fd 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -52,6 +52,7 @@ dependencies { implementation project(":sdks:java:extensions:avro") implementation project(":sdks:java:extensions:google-cloud-platform-core") implementation project(":sdks:java:extensions:python") + implementation project(":sdks:java:io:components") implementation project(":sdks:java:io:google-cloud-platform") implementation project(":sdks:java:io:kafka") implementation project(":sdks:java:extensions:ml") diff --git a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java new file mode 100644 index 000000000000..89e2d5d06802 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterContext; +import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterFactory; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiter; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterContext; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterFactory; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterOptions; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple example demonstrating how to use the {@link RateLimiter} in a custom {@link DoFn}. + * + *

This pipeline creates a small set of elements and processes them using a DoFn that calls an + * external service (simulated). The processing is rate-limited using an Envoy Rate Limit Service. + * + *

To run this example, you need a running Envoy Rate Limit Service. + */ +public class RateLimiterSimple { + + public interface Options extends PipelineOptions { + @Description("Address of the Envoy Rate Limit Service(eg:localhost:8081)") + String getRateLimiterAddress(); + + void setRateLimiterAddress(String value); + + @Description("Domain for the Rate Limit Service(eg:mydomain)") + String getRateLimiterDomain(); + + void setRateLimiterDomain(String value); + } + + static class CallExternalServiceFn extends DoFn { + private final String rlsAddress; + private final String rlsDomain; + private transient @Nullable RateLimiter rateLimiter; + private static final Logger LOG = LoggerFactory.getLogger(CallExternalServiceFn.class); + + public CallExternalServiceFn(String rlsAddress, String rlsDomain) { + this.rlsAddress = rlsAddress; + this.rlsDomain = rlsDomain; + } + + @Setup + public void setup() { + // Create the RateLimiterOptions. + RateLimiterOptions options = RateLimiterOptions.builder().setAddress(rlsAddress).build(); + + // Static RateLimtier with pre-configured domain and descriptors + RateLimiterFactory factory = new EnvoyRateLimiterFactory(options); + RateLimiterContext context = + EnvoyRateLimiterContext.builder() + .setDomain(rlsDomain) + .addDescriptor("database", "users") + .build(); + this.rateLimiter = factory.getLimiter(context); + } + + @Teardown + public void teardown() { + if (rateLimiter != null) { + try { + rateLimiter.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to close RateLimiter", e); + } + } + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + String element = c.element(); + try { + Preconditions.checkNotNull(rateLimiter).allow(1); + } catch (Exception e) { + throw new RuntimeException("Failed to acquire rate limit token", e); + } + + // Simulate external API call + LOG.info("Processing: " + element); + Thread.sleep(100); + c.output("Processed: " + element); + } + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + p.apply( + "CreateItems", + Create.of( + IntStream.range(0, 100).mapToObj(i -> "item" + i).collect(Collectors.toList()))) + .apply( + "CallExternalService", + ParDo.of( + new CallExternalServiceFn( + options.getRateLimiterAddress(), options.getRateLimiterDomain()))); + + p.run().waitUntilFinish(); + } +} From 80bb0350eb51e9867ed734f6c9a2b4e8e9cd1217 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Wed, 11 Feb 2026 16:52:27 -0800 Subject: [PATCH 3/7] fix style check --- .../src/main/resources/beam/checkstyle/suppressions.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index f79bb6cf3bfa..1fa516918185 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -62,6 +62,7 @@ + From 7dae5fafdcc748e9c9929964c908ff7e3ebbd7c2 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 12 Feb 2026 14:12:47 -0800 Subject: [PATCH 4/7] simplify teardown Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../main/java/org/apache/beam/examples/RateLimiterSimple.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java index 89e2d5d06802..a33e99e4b239 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java +++ b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java @@ -91,7 +91,7 @@ public void teardown() { try { rateLimiter.close(); } catch (Exception e) { - throw new RuntimeException("Failed to close RateLimiter", e); + LOG.warn("Failed to close RateLimiter", e); } } } From b4ca25640e8328bd064225eb0c92e16fd2e9a640 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 12 Feb 2026 14:14:18 -0800 Subject: [PATCH 5/7] add more jitter Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java index e4a7adaa606b..9fe8fa8070d8 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java @@ -217,7 +217,7 @@ private boolean fetchTokens(EnvoyRateLimiterContext context, int tokens) long jitter = (long) (java.util.concurrent.ThreadLocalRandom.current().nextDouble() - * (0.01 * sleepMillis)); + * (0.1 * sleepMillis)); sleepMillis += jitter; LOG.warn("Throttled by RLS, sleeping for {} ms", sleepMillis); From ce5c50935e7d0e105e2d69c522f9abf756110f78 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 12 Feb 2026 14:15:22 -0800 Subject: [PATCH 6/7] handle thread interrupt Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../sdk/io/components/ratelimiter/RateLimiterClientCache.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java index 1e5fd6321a58..e94f31fa3275 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java @@ -86,6 +86,7 @@ public void release() { channel.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { LOG.error("Couldn't gracefully close gRPC channel={}", channel, e); + Thread.currentThread().interrupt(); } channel.shutdownNow(); } From 71633220137b2f7f02275ff6503f1fa8e4ee5b2e Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 12 Feb 2026 14:27:23 -0800 Subject: [PATCH 7/7] add connection keep-alive configs --- .../ratelimiter/EnvoyRateLimiterFactory.java | 4 ++-- .../components/ratelimiter/RateLimiterClientCache.java | 10 +++++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java index 9fe8fa8070d8..fa512d38ab9d 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java @@ -39,8 +39,8 @@ /** A {@link RateLimiterFactory} for Envoy Rate Limit Service. */ public class EnvoyRateLimiterFactory implements RateLimiterFactory { private static final Logger LOG = LoggerFactory.getLogger(EnvoyRateLimiterFactory.class); - private static final int RPC_RETRY_COUNT = 3; - private static final long RPC_RETRY_DELAY_MILLIS = 5000; + private static final int RPC_RETRY_COUNT = 5; + private static final long RPC_RETRY_DELAY_MILLIS = 1000; private final RateLimiterOptions options; diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java index e94f31fa3275..5fde6fbe8981 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java @@ -37,6 +37,8 @@ public class RateLimiterClientCache { private static final Logger LOG = LoggerFactory.getLogger(RateLimiterClientCache.class); private static final Map CACHE = new ConcurrentHashMap<>(); + private static final int KEEP_ALIVE_TIME_SECONDS = 60; + private static final int KEEP_ALIVE_TIMEOUT_SECONDS = 15; private final ManagedChannel channel; private final String address; @@ -45,7 +47,13 @@ public class RateLimiterClientCache { private RateLimiterClientCache(String address) { this.address = address; LOG.info("Creating new ManagedChannel for RLS at {}", address); - this.channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build(); + this.channel = + ManagedChannelBuilder.forTarget(address) + .usePlaintext() + .keepAliveTime(KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS) + .keepAliveTimeout(KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + .build(); } /**