diff --git a/topic/pom.xml b/topic/pom.xml
index 39ae43330..d7cb1405c 100644
--- a/topic/pom.xml
+++ b/topic/pom.xml
@@ -50,10 +50,38 @@
ydb-junit4-support
test
+
+ org.mockito
+ mockito-core
+ test
+
org.apache.logging.log4j
log4j-slf4j-impl
test
+
+
+
+ jdk8-build
+
+ 1.8
+
+
+
+
+ 4.11.0
+
+
+
+
+ org.mockito
+ mockito-inline
+ ${mockito.version}
+ test
+
+
+
+
diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
index 133f0a857..f4deaa637 100644
--- a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
@@ -18,7 +18,6 @@
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.operation.Operation;
-import tech.ydb.core.settings.BaseRequestSettings;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicClient;
@@ -50,6 +49,7 @@
import tech.ydb.topic.settings.PartitioningSettings;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
import tech.ydb.topic.settings.ReaderSettings;
+import tech.ydb.topic.settings.TopicClientOperationSettings;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.utils.ProtoUtils;
import tech.ydb.topic.write.AsyncWriter;
@@ -86,9 +86,10 @@ public static Builder newClient(TopicRpc rpc) {
return new TopicClientBuilderImpl(rpc);
}
- private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) {
+ private GrpcRequestSettings makeGrpcRequestSettings(TopicClientOperationSettings settings) {
return GrpcRequestSettings.newBuilder()
.withDeadline(settings.getRequestTimeout())
+ .withPreferReadyChannel(settings.isPreferReadyChannel())
.build();
}
@@ -401,10 +402,7 @@ public CompletableFuture commitOffset(String path, CommitOffsetSettings
request.setReadSessionId(settings.getReadSessionId());
}
- GrpcRequestSettings grpcRequestSettings = GrpcRequestSettings.newBuilder()
- .withDeadline(settings.getRequestTimeout())
- .withPreferReadyChannel(true)
- .build();
+ final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings);
return topicRpc.commitOffset(request.build(), grpcRequestSettings);
}
diff --git a/topic/src/main/java/tech/ydb/topic/settings/AlterTopicSettings.java b/topic/src/main/java/tech/ydb/topic/settings/AlterTopicSettings.java
index 3d4f565ee..637edca7c 100644
--- a/topic/src/main/java/tech/ydb/topic/settings/AlterTopicSettings.java
+++ b/topic/src/main/java/tech/ydb/topic/settings/AlterTopicSettings.java
@@ -12,7 +12,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import tech.ydb.core.settings.OperationSettings;
import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.MeteringMode;
import tech.ydb.topic.description.SupportedCodecs;
@@ -20,7 +19,7 @@
/**
* @author Nikolay Perfilov
*/
-public class AlterTopicSettings extends OperationSettings {
+public class AlterTopicSettings extends TopicClientOperationSettings {
@Nullable
private final AlterPartitioningSettings alterPartitioningSettings;
@Nullable
@@ -113,7 +112,7 @@ public MeteringMode getMeteringMode() {
/**
* BUILDER
*/
- public static class Builder extends OperationBuilder {
+ public static class Builder extends TopicClientOperationBuilder {
private AlterPartitioningSettings alterPartitioningSettings = null;
private Duration retentionPeriod = null;
private Long retentionStorageMb = null;
diff --git a/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java b/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java
index 8b05de869..29d0a5e0f 100644
--- a/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java
+++ b/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java
@@ -1,11 +1,9 @@
package tech.ydb.topic.settings;
-import tech.ydb.core.settings.OperationSettings;
-
/**
* @author Nikolay Perfilov
*/
-public class CommitOffsetSettings extends OperationSettings {
+public class CommitOffsetSettings extends TopicClientOperationSettings {
private final long partitionId;
private final String consumer;
private final long offset;
@@ -42,7 +40,7 @@ public String getReadSessionId() {
/*
* BUILDER
*/
- public static class Builder extends OperationBuilder {
+ public static class Builder extends TopicClientOperationBuilder {
private long partitionId = -1;
private String consumer = null;
private long offset = 0;
diff --git a/topic/src/main/java/tech/ydb/topic/settings/CreateTopicSettings.java b/topic/src/main/java/tech/ydb/topic/settings/CreateTopicSettings.java
index c3710a93d..a2c78b69e 100644
--- a/topic/src/main/java/tech/ydb/topic/settings/CreateTopicSettings.java
+++ b/topic/src/main/java/tech/ydb/topic/settings/CreateTopicSettings.java
@@ -12,7 +12,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import tech.ydb.core.settings.OperationSettings;
import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.MeteringMode;
import tech.ydb.topic.description.SupportedCodecs;
@@ -21,7 +20,7 @@
/**
* @author Nikolay Perfilov
*/
-public class CreateTopicSettings extends OperationSettings {
+public class CreateTopicSettings extends TopicClientOperationSettings {
@Nullable
private final PartitioningSettings partitioningSettings;
@Nullable
@@ -94,7 +93,7 @@ public MeteringMode getMeteringMode() {
/**
* BUILDER
*/
- public static class Builder extends OperationBuilder {
+ public static class Builder extends TopicClientOperationBuilder {
private PartitioningSettings partitioningSettings = null;
private Duration retentionPeriod = null;
private long retentionStorageMb = 0;
diff --git a/topic/src/main/java/tech/ydb/topic/settings/DescribeConsumerSettings.java b/topic/src/main/java/tech/ydb/topic/settings/DescribeConsumerSettings.java
index 2831a98ac..89fcd1089 100644
--- a/topic/src/main/java/tech/ydb/topic/settings/DescribeConsumerSettings.java
+++ b/topic/src/main/java/tech/ydb/topic/settings/DescribeConsumerSettings.java
@@ -1,12 +1,10 @@
package tech.ydb.topic.settings;
-import tech.ydb.core.settings.OperationSettings;
-
/**
*
* @author Aleksandr Gorshenin
*/
-public class DescribeConsumerSettings extends OperationSettings {
+public class DescribeConsumerSettings extends TopicClientOperationSettings {
private final boolean includeStats;
private final boolean includeLocation;
@@ -28,7 +26,7 @@ public static Builder newBuilder() {
return new Builder();
}
- public static class Builder extends OperationBuilder {
+ public static class Builder extends TopicClientOperationBuilder {
private boolean includeStats = false;
private boolean includeLocation = false;
diff --git a/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java b/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java
index 1fdf0d1f9..0ad28671d 100644
--- a/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java
+++ b/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java
@@ -1,11 +1,9 @@
package tech.ydb.topic.settings;
-import tech.ydb.core.settings.OperationSettings;
-
/**
* @author Nikolay Perfilov
*/
-public class DescribeTopicSettings extends OperationSettings {
+public class DescribeTopicSettings extends TopicClientOperationSettings {
private final boolean includeStats;
private DescribeTopicSettings(Builder builder) {
@@ -21,7 +19,7 @@ public static Builder newBuilder() {
return new Builder();
}
- public static class Builder extends OperationBuilder {
+ public static class Builder extends TopicClientOperationBuilder {
private boolean includeStats = false;
public Builder withIncludeStats(boolean includeStats) {
diff --git a/topic/src/main/java/tech/ydb/topic/settings/DropTopicSettings.java b/topic/src/main/java/tech/ydb/topic/settings/DropTopicSettings.java
index ddcb27773..d479c102a 100644
--- a/topic/src/main/java/tech/ydb/topic/settings/DropTopicSettings.java
+++ b/topic/src/main/java/tech/ydb/topic/settings/DropTopicSettings.java
@@ -1,11 +1,9 @@
package tech.ydb.topic.settings;
-import tech.ydb.core.settings.OperationSettings;
-
/**
* @author Nikolay Perfilov
*/
-public class DropTopicSettings extends OperationSettings {
+public class DropTopicSettings extends TopicClientOperationSettings {
private DropTopicSettings(Builder builder) {
super(builder);
}
@@ -14,7 +12,7 @@ public static Builder newBuilder() {
return new Builder();
}
- public static class Builder extends OperationBuilder {
+ public static class Builder extends TopicClientOperationBuilder {
@Override
public DropTopicSettings build() {
return new DropTopicSettings(this);
diff --git a/topic/src/main/java/tech/ydb/topic/settings/TopicClientOperationSettings.java b/topic/src/main/java/tech/ydb/topic/settings/TopicClientOperationSettings.java
new file mode 100644
index 000000000..a43527111
--- /dev/null
+++ b/topic/src/main/java/tech/ydb/topic/settings/TopicClientOperationSettings.java
@@ -0,0 +1,31 @@
+package tech.ydb.topic.settings;
+
+import tech.ydb.core.settings.OperationSettings;
+
+public class TopicClientOperationSettings extends OperationSettings {
+ private final boolean preferReadyChannel;
+
+ protected TopicClientOperationSettings(TopicClientOperationBuilder> builder) {
+ super(builder);
+ this.preferReadyChannel = builder.preferReadyChannel;
+ }
+
+ public boolean isPreferReadyChannel() {
+ return preferReadyChannel;
+ }
+
+ public static class TopicClientOperationBuilder>
+ extends OperationBuilder {
+ private boolean preferReadyChannel = false;
+
+ public Self withPreferReadyChannel(boolean value) {
+ this.preferReadyChannel = value;
+ return self();
+ }
+
+ @Override
+ public TopicClientOperationSettings build() {
+ return new TopicClientOperationSettings(this);
+ }
+ }
+}
diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicClientImplTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicClientImplTest.java
new file mode 100644
index 000000000..ef1daff11
--- /dev/null
+++ b/topic/src/test/java/tech/ydb/topic/impl/TopicClientImplTest.java
@@ -0,0 +1,161 @@
+package tech.ydb.topic.impl;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import tech.ydb.core.Result;
+import tech.ydb.core.Status;
+import tech.ydb.core.grpc.GrpcRequestSettings;
+import tech.ydb.topic.TopicClient;
+import tech.ydb.topic.TopicRpc;
+import tech.ydb.topic.settings.AlterTopicSettings;
+import tech.ydb.topic.settings.CommitOffsetSettings;
+import tech.ydb.topic.settings.CreateTopicSettings;
+import tech.ydb.topic.settings.DescribeConsumerSettings;
+import tech.ydb.topic.settings.DescribeTopicSettings;
+import tech.ydb.topic.settings.DropTopicSettings;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TopicClientImplTest {
+
+ @Test
+ public void testEnablePreferReadyChannelSetting() {
+ TopicRpc mock = mock(TopicRpc.class);
+ when(mock.createTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
+ when(mock.dropTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
+ when(mock.alterTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
+ when(mock.commitOffset(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
+ when(mock.describeTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(mock(Result.class)));
+ when(mock.describeConsumer(any(), any())).thenReturn(CompletableFuture.completedFuture(mock(Result.class)));
+
+ final String topic = "topic";
+ final String consumer = "consumer";
+ final String sessionId = "sessionId";
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(GrpcRequestSettings.class);
+ TopicClient client = TopicClientImpl.newClient(mock).build();
+
+ // createTopic
+ client.createTopic(topic, CreateTopicSettings.newBuilder()
+ .withPreferReadyChannel(true)
+ .build());
+
+ verify(mock).createTopic(any(), requestCaptor.capture());
+ assertTrue(requestCaptor.getValue().isPreferReadyChannel());
+
+ // dropTopic
+ client.dropTopic(topic, DropTopicSettings.newBuilder()
+ .withPreferReadyChannel(true)
+ .build());
+
+ verify(mock).dropTopic(any(), requestCaptor.capture());
+ assertTrue(requestCaptor.getValue().isPreferReadyChannel());
+
+ // alterTopic
+ client.alterTopic(topic, AlterTopicSettings.newBuilder()
+ .withPreferReadyChannel(true)
+ .build());
+
+ verify(mock).alterTopic(any(), requestCaptor.capture());
+ assertTrue(requestCaptor.getValue().isPreferReadyChannel());
+
+ // commitOffset
+ client.commitOffset(topic, CommitOffsetSettings.newBuilder()
+ .setReadSessionId(sessionId)
+ .withPreferReadyChannel(true)
+ .setConsumer(consumer)
+ .setPartitionId(0)
+ .build());
+
+ verify(mock).commitOffset(any(), requestCaptor.capture());
+ assertTrue(requestCaptor.getValue().isPreferReadyChannel());
+
+ // describeTopic
+ client.describeTopic(topic, DescribeTopicSettings.newBuilder()
+ .withPreferReadyChannel(true)
+ .build());
+
+ verify(mock).describeTopic(any(), requestCaptor.capture());
+ assertTrue(requestCaptor.getValue().isPreferReadyChannel());
+
+ // describeConsumer
+ client.describeConsumer(topic, consumer, DescribeConsumerSettings.newBuilder()
+ .withPreferReadyChannel(true)
+ .build());
+
+ verify(mock).describeConsumer(any(), requestCaptor.capture());
+ assertTrue(requestCaptor.getValue().isPreferReadyChannel());
+ }
+
+ @Test
+ public void testDefaultPreferReadyChannelSetting() {
+ TopicRpc mock = mock(TopicRpc.class);
+ when(mock.createTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
+ when(mock.dropTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
+ when(mock.alterTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
+ when(mock.commitOffset(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
+ when(mock.describeTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(mock(Result.class)));
+ when(mock.describeConsumer(any(), any())).thenReturn(CompletableFuture.completedFuture(mock(Result.class)));
+
+ final String topic = "topic";
+ final String consumer = "consumer";
+ final String sessionId = "sessionId";
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(GrpcRequestSettings.class);
+ TopicClient client = TopicClientImpl.newClient(mock).build();
+
+ // createTopic
+ client.createTopic(topic, CreateTopicSettings.newBuilder()
+ .build());
+
+ verify(mock).createTopic(any(), requestCaptor.capture());
+ assertFalse(requestCaptor.getValue().isPreferReadyChannel());
+
+ // dropTopic
+ client.dropTopic(topic, DropTopicSettings.newBuilder()
+ .build());
+
+ verify(mock).dropTopic(any(), requestCaptor.capture());
+ assertFalse(requestCaptor.getValue().isPreferReadyChannel());
+
+ // alterTopic
+ client.alterTopic(topic, AlterTopicSettings.newBuilder()
+ .build());
+
+ verify(mock).alterTopic(any(), requestCaptor.capture());
+ assertFalse(requestCaptor.getValue().isPreferReadyChannel());
+
+ // commitOffset
+ client.commitOffset(topic, CommitOffsetSettings.newBuilder()
+ .setReadSessionId(sessionId)
+ .setConsumer(consumer)
+ .setPartitionId(0)
+ .build());
+
+ verify(mock).commitOffset(any(), requestCaptor.capture());
+ assertFalse(requestCaptor.getValue().isPreferReadyChannel());
+
+ // describeTopic
+ client.describeTopic(topic, DescribeTopicSettings.newBuilder()
+ .build());
+
+ verify(mock).describeTopic(any(), requestCaptor.capture());
+ assertFalse(requestCaptor.getValue().isPreferReadyChannel());
+
+ // describeConsumer
+ client.describeConsumer(topic, consumer, DescribeConsumerSettings.newBuilder()
+ .build());
+
+ verify(mock).describeConsumer(any(), requestCaptor.capture());
+ assertFalse(requestCaptor.getValue().isPreferReadyChannel());
+ }
+}