From 0c59a0f729fe2c1c76a1258605f5b35b26fea704 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 4 Feb 2026 21:12:25 +0000 Subject: [PATCH] feat(kafka): allow configurable listener name and security protocol --- modules/kafka/testcontainers/kafka/__init__.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/modules/kafka/testcontainers/kafka/__init__.py b/modules/kafka/testcontainers/kafka/__init__.py index 8eb4f718..315c16ce 100644 --- a/modules/kafka/testcontainers/kafka/__init__.py +++ b/modules/kafka/testcontainers/kafka/__init__.py @@ -60,17 +60,21 @@ def __init__( image: str = "confluentinc/cp-kafka:7.6.0", port: int = 9093, wait_strategy_check_string: str = r".*\[KafkaServer id=\d+\] started.*", + listener_name: str = "PLAINTEXT", + security_protocol: str = "PLAINTEXT", **kwargs, ) -> None: raise_for_deprecated_parameter(kwargs, "port_to_expose", "port") super().__init__(image, **kwargs) self.port = port + self.listener_name = listener_name + self.security_protocol = security_protocol self.kraft_enabled = False self.wait_for: re.Pattern[str] = re.compile(wait_strategy_check_string) self.boot_command = "" self.cluster_id = "MkU3OEVBNTcwNTJENDM2Qk" - self.listeners = f"PLAINTEXT://0.0.0.0:{self.port},BROKER://0.0.0.0:9092" - self.security_protocol_map = "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT" + self.listeners = f"{listener_name}://0.0.0.0:{self.port},BROKER://0.0.0.0:9092" + self.security_protocol_map = f"BROKER:PLAINTEXT,{listener_name}:{security_protocol}" self.with_exposed_ports(self.port) self.with_env("KAFKA_LISTENERS", self.listeners) @@ -160,9 +164,9 @@ def tc_start(self) -> None: host = self.get_container_host_ip() port = self.get_exposed_port(self.port) if kafka_config.limit_broker_to_first_host: - listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i | cut -d' ' -f1):9092" + listeners = f"{self.listener_name}://{host}:{port},BROKER://$(hostname -i | cut -d' ' -f1):9092" else: - listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i):9092" + listeners = f"{self.listener_name}://{host}:{port},BROKER://$(hostname -i):9092" data = ( dedent( f"""