diff --git a/topic/pom.xml b/topic/pom.xml index 39ae43330..d7cb1405c 100644 --- a/topic/pom.xml +++ b/topic/pom.xml @@ -50,10 +50,38 @@ ydb-junit4-support test + + org.mockito + mockito-core + test + org.apache.logging.log4j log4j-slf4j-impl test + + + + jdk8-build + + 1.8 + + + + + 4.11.0 + + + + + org.mockito + mockito-inline + ${mockito.version} + test + + + + diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java index 133f0a857..f4deaa637 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java @@ -18,7 +18,6 @@ import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.operation.Operation; -import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.utils.ProtobufUtils; import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicClient; @@ -50,6 +49,7 @@ import tech.ydb.topic.settings.PartitioningSettings; import tech.ydb.topic.settings.ReadEventHandlersSettings; import tech.ydb.topic.settings.ReaderSettings; +import tech.ydb.topic.settings.TopicClientOperationSettings; import tech.ydb.topic.settings.WriterSettings; import tech.ydb.topic.utils.ProtoUtils; import tech.ydb.topic.write.AsyncWriter; @@ -86,9 +86,10 @@ public static Builder newClient(TopicRpc rpc) { return new TopicClientBuilderImpl(rpc); } - private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) { + private GrpcRequestSettings makeGrpcRequestSettings(TopicClientOperationSettings settings) { return GrpcRequestSettings.newBuilder() .withDeadline(settings.getRequestTimeout()) + .withPreferReadyChannel(settings.isPreferReadyChannel()) .build(); } @@ -401,10 +402,7 @@ public CompletableFuture commitOffset(String path, CommitOffsetSettings request.setReadSessionId(settings.getReadSessionId()); } - GrpcRequestSettings grpcRequestSettings = GrpcRequestSettings.newBuilder() - .withDeadline(settings.getRequestTimeout()) - .withPreferReadyChannel(true) - .build(); + final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings); return topicRpc.commitOffset(request.build(), grpcRequestSettings); } diff --git a/topic/src/main/java/tech/ydb/topic/settings/AlterTopicSettings.java b/topic/src/main/java/tech/ydb/topic/settings/AlterTopicSettings.java index 3d4f565ee..637edca7c 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/AlterTopicSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/AlterTopicSettings.java @@ -12,7 +12,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import tech.ydb.core.settings.OperationSettings; import tech.ydb.topic.description.Consumer; import tech.ydb.topic.description.MeteringMode; import tech.ydb.topic.description.SupportedCodecs; @@ -20,7 +19,7 @@ /** * @author Nikolay Perfilov */ -public class AlterTopicSettings extends OperationSettings { +public class AlterTopicSettings extends TopicClientOperationSettings { @Nullable private final AlterPartitioningSettings alterPartitioningSettings; @Nullable @@ -113,7 +112,7 @@ public MeteringMode getMeteringMode() { /** * BUILDER */ - public static class Builder extends OperationBuilder { + public static class Builder extends TopicClientOperationBuilder { private AlterPartitioningSettings alterPartitioningSettings = null; private Duration retentionPeriod = null; private Long retentionStorageMb = null; diff --git a/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java b/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java index 8b05de869..29d0a5e0f 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/CommitOffsetSettings.java @@ -1,11 +1,9 @@ package tech.ydb.topic.settings; -import tech.ydb.core.settings.OperationSettings; - /** * @author Nikolay Perfilov */ -public class CommitOffsetSettings extends OperationSettings { +public class CommitOffsetSettings extends TopicClientOperationSettings { private final long partitionId; private final String consumer; private final long offset; @@ -42,7 +40,7 @@ public String getReadSessionId() { /* * BUILDER */ - public static class Builder extends OperationBuilder { + public static class Builder extends TopicClientOperationBuilder { private long partitionId = -1; private String consumer = null; private long offset = 0; diff --git a/topic/src/main/java/tech/ydb/topic/settings/CreateTopicSettings.java b/topic/src/main/java/tech/ydb/topic/settings/CreateTopicSettings.java index c3710a93d..a2c78b69e 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/CreateTopicSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/CreateTopicSettings.java @@ -12,7 +12,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import tech.ydb.core.settings.OperationSettings; import tech.ydb.topic.description.Consumer; import tech.ydb.topic.description.MeteringMode; import tech.ydb.topic.description.SupportedCodecs; @@ -21,7 +20,7 @@ /** * @author Nikolay Perfilov */ -public class CreateTopicSettings extends OperationSettings { +public class CreateTopicSettings extends TopicClientOperationSettings { @Nullable private final PartitioningSettings partitioningSettings; @Nullable @@ -94,7 +93,7 @@ public MeteringMode getMeteringMode() { /** * BUILDER */ - public static class Builder extends OperationBuilder { + public static class Builder extends TopicClientOperationBuilder { private PartitioningSettings partitioningSettings = null; private Duration retentionPeriod = null; private long retentionStorageMb = 0; diff --git a/topic/src/main/java/tech/ydb/topic/settings/DescribeConsumerSettings.java b/topic/src/main/java/tech/ydb/topic/settings/DescribeConsumerSettings.java index 2831a98ac..89fcd1089 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/DescribeConsumerSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/DescribeConsumerSettings.java @@ -1,12 +1,10 @@ package tech.ydb.topic.settings; -import tech.ydb.core.settings.OperationSettings; - /** * * @author Aleksandr Gorshenin */ -public class DescribeConsumerSettings extends OperationSettings { +public class DescribeConsumerSettings extends TopicClientOperationSettings { private final boolean includeStats; private final boolean includeLocation; @@ -28,7 +26,7 @@ public static Builder newBuilder() { return new Builder(); } - public static class Builder extends OperationBuilder { + public static class Builder extends TopicClientOperationBuilder { private boolean includeStats = false; private boolean includeLocation = false; diff --git a/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java b/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java index 1fdf0d1f9..0ad28671d 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/DescribeTopicSettings.java @@ -1,11 +1,9 @@ package tech.ydb.topic.settings; -import tech.ydb.core.settings.OperationSettings; - /** * @author Nikolay Perfilov */ -public class DescribeTopicSettings extends OperationSettings { +public class DescribeTopicSettings extends TopicClientOperationSettings { private final boolean includeStats; private DescribeTopicSettings(Builder builder) { @@ -21,7 +19,7 @@ public static Builder newBuilder() { return new Builder(); } - public static class Builder extends OperationBuilder { + public static class Builder extends TopicClientOperationBuilder { private boolean includeStats = false; public Builder withIncludeStats(boolean includeStats) { diff --git a/topic/src/main/java/tech/ydb/topic/settings/DropTopicSettings.java b/topic/src/main/java/tech/ydb/topic/settings/DropTopicSettings.java index ddcb27773..d479c102a 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/DropTopicSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/DropTopicSettings.java @@ -1,11 +1,9 @@ package tech.ydb.topic.settings; -import tech.ydb.core.settings.OperationSettings; - /** * @author Nikolay Perfilov */ -public class DropTopicSettings extends OperationSettings { +public class DropTopicSettings extends TopicClientOperationSettings { private DropTopicSettings(Builder builder) { super(builder); } @@ -14,7 +12,7 @@ public static Builder newBuilder() { return new Builder(); } - public static class Builder extends OperationBuilder { + public static class Builder extends TopicClientOperationBuilder { @Override public DropTopicSettings build() { return new DropTopicSettings(this); diff --git a/topic/src/main/java/tech/ydb/topic/settings/TopicClientOperationSettings.java b/topic/src/main/java/tech/ydb/topic/settings/TopicClientOperationSettings.java new file mode 100644 index 000000000..a43527111 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/settings/TopicClientOperationSettings.java @@ -0,0 +1,31 @@ +package tech.ydb.topic.settings; + +import tech.ydb.core.settings.OperationSettings; + +public class TopicClientOperationSettings extends OperationSettings { + private final boolean preferReadyChannel; + + protected TopicClientOperationSettings(TopicClientOperationBuilder builder) { + super(builder); + this.preferReadyChannel = builder.preferReadyChannel; + } + + public boolean isPreferReadyChannel() { + return preferReadyChannel; + } + + public static class TopicClientOperationBuilder> + extends OperationBuilder { + private boolean preferReadyChannel = false; + + public Self withPreferReadyChannel(boolean value) { + this.preferReadyChannel = value; + return self(); + } + + @Override + public TopicClientOperationSettings build() { + return new TopicClientOperationSettings(this); + } + } +} diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicClientImplTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicClientImplTest.java new file mode 100644 index 000000000..ef1daff11 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/TopicClientImplTest.java @@ -0,0 +1,161 @@ +package tech.ydb.topic.impl; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.topic.TopicClient; +import tech.ydb.topic.TopicRpc; +import tech.ydb.topic.settings.AlterTopicSettings; +import tech.ydb.topic.settings.CommitOffsetSettings; +import tech.ydb.topic.settings.CreateTopicSettings; +import tech.ydb.topic.settings.DescribeConsumerSettings; +import tech.ydb.topic.settings.DescribeTopicSettings; +import tech.ydb.topic.settings.DropTopicSettings; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TopicClientImplTest { + + @Test + public void testEnablePreferReadyChannelSetting() { + TopicRpc mock = mock(TopicRpc.class); + when(mock.createTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS)); + when(mock.dropTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS)); + when(mock.alterTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS)); + when(mock.commitOffset(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS)); + when(mock.describeTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(mock(Result.class))); + when(mock.describeConsumer(any(), any())).thenReturn(CompletableFuture.completedFuture(mock(Result.class))); + + final String topic = "topic"; + final String consumer = "consumer"; + final String sessionId = "sessionId"; + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(GrpcRequestSettings.class); + TopicClient client = TopicClientImpl.newClient(mock).build(); + + // createTopic + client.createTopic(topic, CreateTopicSettings.newBuilder() + .withPreferReadyChannel(true) + .build()); + + verify(mock).createTopic(any(), requestCaptor.capture()); + assertTrue(requestCaptor.getValue().isPreferReadyChannel()); + + // dropTopic + client.dropTopic(topic, DropTopicSettings.newBuilder() + .withPreferReadyChannel(true) + .build()); + + verify(mock).dropTopic(any(), requestCaptor.capture()); + assertTrue(requestCaptor.getValue().isPreferReadyChannel()); + + // alterTopic + client.alterTopic(topic, AlterTopicSettings.newBuilder() + .withPreferReadyChannel(true) + .build()); + + verify(mock).alterTopic(any(), requestCaptor.capture()); + assertTrue(requestCaptor.getValue().isPreferReadyChannel()); + + // commitOffset + client.commitOffset(topic, CommitOffsetSettings.newBuilder() + .setReadSessionId(sessionId) + .withPreferReadyChannel(true) + .setConsumer(consumer) + .setPartitionId(0) + .build()); + + verify(mock).commitOffset(any(), requestCaptor.capture()); + assertTrue(requestCaptor.getValue().isPreferReadyChannel()); + + // describeTopic + client.describeTopic(topic, DescribeTopicSettings.newBuilder() + .withPreferReadyChannel(true) + .build()); + + verify(mock).describeTopic(any(), requestCaptor.capture()); + assertTrue(requestCaptor.getValue().isPreferReadyChannel()); + + // describeConsumer + client.describeConsumer(topic, consumer, DescribeConsumerSettings.newBuilder() + .withPreferReadyChannel(true) + .build()); + + verify(mock).describeConsumer(any(), requestCaptor.capture()); + assertTrue(requestCaptor.getValue().isPreferReadyChannel()); + } + + @Test + public void testDefaultPreferReadyChannelSetting() { + TopicRpc mock = mock(TopicRpc.class); + when(mock.createTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS)); + when(mock.dropTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS)); + when(mock.alterTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS)); + when(mock.commitOffset(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS)); + when(mock.describeTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(mock(Result.class))); + when(mock.describeConsumer(any(), any())).thenReturn(CompletableFuture.completedFuture(mock(Result.class))); + + final String topic = "topic"; + final String consumer = "consumer"; + final String sessionId = "sessionId"; + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(GrpcRequestSettings.class); + TopicClient client = TopicClientImpl.newClient(mock).build(); + + // createTopic + client.createTopic(topic, CreateTopicSettings.newBuilder() + .build()); + + verify(mock).createTopic(any(), requestCaptor.capture()); + assertFalse(requestCaptor.getValue().isPreferReadyChannel()); + + // dropTopic + client.dropTopic(topic, DropTopicSettings.newBuilder() + .build()); + + verify(mock).dropTopic(any(), requestCaptor.capture()); + assertFalse(requestCaptor.getValue().isPreferReadyChannel()); + + // alterTopic + client.alterTopic(topic, AlterTopicSettings.newBuilder() + .build()); + + verify(mock).alterTopic(any(), requestCaptor.capture()); + assertFalse(requestCaptor.getValue().isPreferReadyChannel()); + + // commitOffset + client.commitOffset(topic, CommitOffsetSettings.newBuilder() + .setReadSessionId(sessionId) + .setConsumer(consumer) + .setPartitionId(0) + .build()); + + verify(mock).commitOffset(any(), requestCaptor.capture()); + assertFalse(requestCaptor.getValue().isPreferReadyChannel()); + + // describeTopic + client.describeTopic(topic, DescribeTopicSettings.newBuilder() + .build()); + + verify(mock).describeTopic(any(), requestCaptor.capture()); + assertFalse(requestCaptor.getValue().isPreferReadyChannel()); + + // describeConsumer + client.describeConsumer(topic, consumer, DescribeConsumerSettings.newBuilder() + .build()); + + verify(mock).describeConsumer(any(), requestCaptor.capture()); + assertFalse(requestCaptor.getValue().isPreferReadyChannel()); + } +}