From 9d6764a8208753341a51296c3b7e5f25665d1b54 Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Mon, 9 Feb 2026 23:29:48 +0500 Subject: [PATCH] Fix unhandled exception in KafkaIO SDF (#37449) Replace checkState() with LOG.warn() in GenerateKafkaSourceDescriptor and WatchForKafkaTopicPartitions to prevent IllegalStateException from causing infinite retries when Kafka returns null or empty partition info for a topic. Topics with missing partitions are now gracefully skipped with a warning log instead of throwing. --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 14 ++---- .../kafka/WatchForKafkaTopicPartitions.java | 17 +++++--- .../WatchForKafkaTopicPartitionsTest.java | 43 +++++++++++++++++++ 3 files changed, 57 insertions(+), 17 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index ad5535517646..32dd9b4cdc16 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -2127,18 +2127,12 @@ public void processElement(OutputReceiver receiver) { } else { for (String topic : topics) { List partitionInfoList = consumer.partitionsFor(topic); - if (logTopicVerification == null || !logTopicVerification) { - checkState( - partitionInfoList != null && !partitionInfoList.isEmpty(), - "Could not find any partitions info for topic " - + topic - + ". Please check Kafka configuration and make sure " - + "that provided topics exist."); - } else { + if (partitionInfoList == null || partitionInfoList.isEmpty()) { LOG.warn( - "Could not find any partitions info for topic {}. Please check Kafka configuration " - + "and make sure that the provided topics exist.", + "Could not find any partitions info for topic {}. Please check Kafka " + + "configuration and make sure that the provided topics exist.", topic); + continue; } for (PartitionInfo p : partitionInfoList) { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java index 65ece98d618f..3184b18267b2 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.kafka; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import java.util.ArrayList; import java.util.List; @@ -45,6 +44,8 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A {@link PTransform} for continuously querying Kafka for new partitions, and emitting those @@ -57,6 +58,7 @@ */ class WatchForKafkaTopicPartitions extends PTransform> { + private static final Logger LOG = LoggerFactory.getLogger(WatchForKafkaTopicPartitions.class); private static final Duration DEFAULT_CHECK_DURATION = Duration.standardHours(1); private static final String COUNTER_NAMESPACE = "watch_kafka_topic_partition"; @@ -191,12 +193,13 @@ static List getAllTopicPartitions( if (topics != null && !topics.isEmpty()) { for (String topic : topics) { List partitionInfoList = kafkaConsumer.partitionsFor(topic); - checkState( - partitionInfoList != null && !partitionInfoList.isEmpty(), - "Could not find any partitions info for topic " - + topic - + ". Please check Kafka configuration and make sure " - + "that provided topics exist."); + if (partitionInfoList == null || partitionInfoList.isEmpty()) { + LOG.warn( + "Could not find any partitions info for topic {}. Please check Kafka " + + "configuration and make sure that the provided topics exist.", + topic); + continue; + } for (PartitionInfo partition : partitionInfoList) { current.add(new TopicPartition(topic, partition.partition())); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitionsTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitionsTest.java index 595d040bf403..30ace6cd86d0 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitionsTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitionsTest.java @@ -18,10 +18,12 @@ package org.apache.beam.sdk.io.kafka; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.Set; import java.util.regex.Pattern; import org.apache.beam.sdk.io.kafka.KafkaMocks.PartitionGrowthMockConsumer; @@ -108,6 +110,47 @@ public void testGetAllTopicPartitionsWithGivenTopics() throws Exception { (input) -> mockConsumer, null, givenTopics, null)); } + @Test + public void testGetAllTopicPartitionsWithNullPartitionInfo() throws Exception { + Set givenTopics = ImmutableSet.of("topic1"); + + Consumer mockConsumer = Mockito.mock(Consumer.class); + when(mockConsumer.partitionsFor("topic1")).thenReturn(null); + assertTrue( + WatchForKafkaTopicPartitions.getAllTopicPartitions( + (input) -> mockConsumer, null, givenTopics, null) + .isEmpty()); + } + + @Test + public void testGetAllTopicPartitionsWithEmptyPartitionInfo() throws Exception { + Set givenTopics = ImmutableSet.of("topic1"); + + Consumer mockConsumer = Mockito.mock(Consumer.class); + when(mockConsumer.partitionsFor("topic1")).thenReturn(Collections.emptyList()); + assertTrue( + WatchForKafkaTopicPartitions.getAllTopicPartitions( + (input) -> mockConsumer, null, givenTopics, null) + .isEmpty()); + } + + @Test + public void testGetAllTopicPartitionsSkipsMissingTopics() throws Exception { + Set givenTopics = ImmutableSet.of("topic1", "topic2"); + + Consumer mockConsumer = Mockito.mock(Consumer.class); + when(mockConsumer.partitionsFor("topic1")).thenReturn(null); + when(mockConsumer.partitionsFor("topic2")) + .thenReturn( + ImmutableList.of( + new PartitionInfo("topic2", 0, null, null, null), + new PartitionInfo("topic2", 1, null, null, null))); + assertEquals( + ImmutableList.of(new TopicPartition("topic2", 0), new TopicPartition("topic2", 1)), + WatchForKafkaTopicPartitions.getAllTopicPartitions( + (input) -> mockConsumer, null, givenTopics, null)); + } + @Test public void testGetAllTopicPartitionsWithGivenPattern() throws Exception { Consumer mockConsumer = Mockito.mock(Consumer.class);