diff --git a/behave_framework/src/minifi_test_framework/containers/container.py b/behave_framework/src/minifi_test_framework/containers/container.py index 8c59586f5d..528398e26d 100644 --- a/behave_framework/src/minifi_test_framework/containers/container.py +++ b/behave_framework/src/minifi_test_framework/containers/container.py @@ -20,6 +20,7 @@ import shlex import tempfile import tarfile +import uuid import docker from docker.models.networks import Network @@ -50,6 +51,23 @@ def __init__(self, image_name: str, container_name: str, network: Network, comma def add_host_file(self, host_path: str, container_path: str, mode: str = "ro"): self.host_files.append(HostFile(container_path, host_path, mode)) + def add_file_to_running_container(self, content: str, path: str): + if not self.container: + logging.error("Container is not running. Cannot add file.") + raise RuntimeError("Container is not running. Cannot add file.") + + mkdir_command = f"mkdir -p {shlex.quote(path)}" + exit_code, output = self.exec_run(mkdir_command) + if exit_code != 0: + logging.error(f"Error creating directory '{path}' in container: {output}") + raise RuntimeError(f"Error creating directory '{path}' in container: {output}") + + file_name = str(uuid.uuid4()) + exit_code, output = self.exec_run(f"sh -c \"printf %s '{content}' > {os.path.join(path, file_name)}\"") + if exit_code != 0: + logging.error(f"Error adding file to running container: {output}") + raise RuntimeError(f"Error adding file to running container: {output}") + def _write_content_to_file(self, filepath: str, permissions: int | None, content: str | bytes): write_mode = "w" if isinstance(content, bytes): @@ -74,7 +92,13 @@ def _configure_volumes_of_container_dirs(self): self._write_content_to_file(file_path, None, content) self.volumes[temp_path] = {"bind": directory.path, "mode": directory.mode} + def is_deployed(self) -> bool: + return self.container is not None + def deploy(self) -> bool: + if self.is_deployed(): + logging.info(f"Container '{self.container_name}' is already deployed.") + return True self._temp_dir = tempfile.TemporaryDirectory() self._configure_volumes_of_container_files() self._configure_volumes_of_container_dirs() diff --git a/behave_framework/src/minifi_test_framework/steps/checking_steps.py b/behave_framework/src/minifi_test_framework/steps/checking_steps.py index 7aad06a0fd..9cde71f781 100644 --- a/behave_framework/src/minifi_test_framework/steps/checking_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/checking_steps.py @@ -36,13 +36,18 @@ def step_impl(context: MinifiTestContext, content: str, path: str, duration: str timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_default_minifi_container().exited, context=context) -@then('a single file with the content "{content}" is placed in the "{directory}" directory in less than {duration}') -def step_impl(context: MinifiTestContext, content: str, directory: str, duration: str): +@then('in the "{container_name}" container a single file with the content "{content}" is placed in the "{directory}" directory in less than {duration}') +def step_impl(context: MinifiTestContext, container_name: str, content: str, directory: str, duration: str): new_content = content.replace("\\n", "\n") timeout_in_seconds = humanfriendly.parse_timespan(duration) assert wait_for_condition( - condition=lambda: context.get_default_minifi_container().directory_has_single_file_with_content(directory, new_content), - timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_default_minifi_container().exited, context=context) + condition=lambda: context.get_minifi_container(container_name).directory_has_single_file_with_content(directory, new_content), + timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_minifi_container(container_name).exited, context=context) + + +@then('a single file with the content "{content}" is placed in the "{directory}" directory in less than {duration}') +def step_impl(context: MinifiTestContext, content: str, directory: str, duration: str): + context.execute_steps(f'then in the "{DEFAULT_MINIFI_CONTAINER_NAME}" container a single file with the content "{content}" is placed in the "{directory}" directory in less than {duration}') @then('in the "{container_name}" container at least one file with the content "{content}" is placed in the "{directory}" directory in less than {duration}') @@ -147,6 +152,7 @@ def step_impl(context: MinifiTestContext, directory: str, regex_str: str, durati @then('files with contents "{content_one}" and "{content_two}" are placed in the "{directory}" directory in less than {timeout}') +@then("files with contents '{content_one}' and '{content_two}' are placed in the '{directory}' directory in less than {timeout}") def step_impl(context: MinifiTestContext, directory: str, timeout: str, content_one: str, content_two: str): timeout_seconds = humanfriendly.parse_timespan(timeout) c1 = content_one.replace("\\n", "\n") diff --git a/behave_framework/src/minifi_test_framework/steps/core_steps.py b/behave_framework/src/minifi_test_framework/steps/core_steps.py index a797f954e1..968689e8fa 100644 --- a/behave_framework/src/minifi_test_framework/steps/core_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/core_steps.py @@ -62,10 +62,27 @@ def step_impl(context: MinifiTestContext, file_name: str, content: str, path: st context.get_or_create_default_minifi_container().files.append(File(os.path.join(path, file_name), new_content)) -@step('a file with the content "{content}" is present in "{path}"') +@given('a file with the content "{content}" is present in "{path}" in the "{container_name}" flow') +def step_impl(context: MinifiTestContext, content: str, path: str, container_name: str): + new_content = content.replace("\\n", "\n") + context.get_or_create_minifi_container(container_name).files.append(File(os.path.join(path, str(uuid.uuid4())), new_content)) + + +@given('a file with the content "{content}" is present in "{path}"') +@given("a file with the content '{content}' is present in '{path}'") def step_impl(context: MinifiTestContext, content: str, path: str): + context.execute_steps(f"given a file with the content \"{content}\" is present in \"{path}\" in the \"{DEFAULT_MINIFI_CONTAINER_NAME}\" flow") + + +@when('a file with the content "{content}" is placed in "{path}" in the "{container_name}" flow') +def step_impl(context: MinifiTestContext, content: str, path: str, container_name: str): new_content = content.replace("\\n", "\n") - context.get_or_create_default_minifi_container().files.append(File(os.path.join(path, str(uuid.uuid4())), new_content)) + context.get_minifi_container(container_name).add_file_to_running_container(new_content, path) + + +@when('a file with the content "{content}" is placed in "{path}"') +def step_impl(context: MinifiTestContext, content: str, path: str): + context.execute_steps(f"when a file with the content \"{content}\" is placed in \"{path}\" in the \"{DEFAULT_MINIFI_CONTAINER_NAME}\" flow") @given("an empty file is present in \"{path}\"") @@ -95,11 +112,31 @@ def step_impl(context: MinifiTestContext): context.get_or_create_default_minifi_container().stop() +@when("\"{container_name}\" flow is stopped") +def step_impl(context: MinifiTestContext, container_name: str): + context.get_or_create_minifi_container(container_name).stop() + + @when("MiNiFi is restarted") def step_impl(context: MinifiTestContext): context.get_or_create_default_minifi_container().restart() +@when("\"{container_name}\" flow is restarted") +def step_impl(context: MinifiTestContext, container_name: str): + context.get_or_create_minifi_container(container_name).restart() + + +@when("\"{container_name}\" flow is started") +def step_impl(context: MinifiTestContext, container_name: str): + context.get_or_create_minifi_container(container_name).start() + + +@when("\"{container_name}\" flow is killed") +def step_impl(context: MinifiTestContext, container_name: str): + context.get_or_create_minifi_container(container_name).kill() + + @given("OpenSSL FIPS mode is enabled in MiNiFi") def step_impl(context: MinifiTestContext): context.get_or_create_default_minifi_container().enable_openssl_fips_mode() @@ -145,3 +182,8 @@ def step_impl(context: MinifiTestContext): def step_impl(context: MinifiTestContext): context.containers["minifi-c2-server"] = MinifiC2Server(context) assert context.containers["minifi-c2-server"].deploy() + + +@step("{duration} later") +def step_impl(context: MinifiTestContext, duration: str): + time.sleep(humanfriendly.parse_timespan(duration)) diff --git a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py index 23db9824e8..1984586b90 100644 --- a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py @@ -66,10 +66,16 @@ def step_impl(context: MinifiTestContext, processor_type: str, property_name: st context.get_or_create_minifi_container(minifi_container_name).flow_definition.add_processor(processor) +@given('a {processor_type} processor with the name "{processor_name}" in the "{minifi_container_name}" flow') +def step_impl(context: MinifiTestContext, processor_type: str, processor_name: str, minifi_container_name: str): + processor = Processor(processor_type, processor_name) + context.get_or_create_minifi_container(minifi_container_name).flow_definition.add_processor(processor) + + @given('a {processor_type} processor with the name "{processor_name}"') def step_impl(context: MinifiTestContext, processor_type: str, processor_name: str): - processor = Processor(processor_type, processor_name) - context.get_or_create_default_minifi_container().flow_definition.add_processor(processor) + context.execute_steps( + f'given a {processor_type} processor with the name "{processor_name}" in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow') @given("a {processor_type} processor in the \"{minifi_container_name}\" flow") diff --git a/docker/RunBehaveTests.sh b/docker/RunBehaveTests.sh index 9d90bbf8e5..303af91609 100755 --- a/docker/RunBehaveTests.sh +++ b/docker/RunBehaveTests.sh @@ -207,4 +207,5 @@ exec \ "${docker_dir}/../extensions/gcp/tests/features" \ "${docker_dir}/../extensions/grafana-loki/tests/features" \ "${docker_dir}/../extensions/lua/tests/features/" \ - "${docker_dir}/../extensions/civetweb/tests/features/" + "${docker_dir}/../extensions/civetweb/tests/features/" \ + "${docker_dir}/../extensions/mqtt/tests/features/" diff --git a/docker/requirements.txt b/docker/requirements.txt index 77c4c2fdc6..27cc27bf52 100644 --- a/docker/requirements.txt +++ b/docker/requirements.txt @@ -10,4 +10,3 @@ azure-storage-blob==12.24.1 prometheus-api-client==0.5.5 humanfriendly==10.0 requests<2.29 # https://github.com/docker/docker-py/issues/3113 -paho-mqtt==2.1.0 diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index f139acd21a..4b1b69a294 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -21,7 +21,6 @@ from .containers.AzureStorageServerContainer import AzureStorageServerContainer from .containers.HttpProxyContainer import HttpProxyContainer from .containers.PostgreSQLServerContainer import PostgreSQLServerContainer -from .containers.MqttBrokerContainer import MqttBrokerContainer from .containers.SyslogUdpClientContainer import SyslogUdpClientContainer from .containers.SyslogTcpClientContainer import SyslogTcpClientContainer from .containers.MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster @@ -128,14 +127,6 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c network=self.network, image_store=self.image_store, command=command)) - elif engine == 'mqtt-broker': - return self.containers.setdefault(container_name, - MqttBrokerContainer(feature_context=feature_context, - name=container_name, - vols=self.vols, - network=self.network, - image_store=self.image_store, - command=command)) elif engine == "syslog-udp-client": return self.containers.setdefault(container_name, SyslogUdpClientContainer( diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index 458e0bd887..006aa97c56 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -23,7 +23,6 @@ from .checkers.PostgresChecker import PostgresChecker from .checkers.PrometheusChecker import PrometheusChecker from .checkers.ModbusChecker import ModbusChecker -from .checkers.MqttHelper import MqttHelper from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage @@ -37,7 +36,6 @@ def __init__(self, context, feature_id): self.postgres_checker = PostgresChecker(self.container_communicator) self.prometheus_checker = PrometheusChecker() self.modbus_checker = ModbusChecker(self.container_communicator) - self.mqtt_helper = MqttHelper() def cleanup(self): self.container_store.cleanup() @@ -278,6 +276,3 @@ def set_value_on_plc_with_modbus(self, container_name, modbus_cmd): def enable_ssl_in_nifi(self): self.container_store.enable_ssl_in_nifi() - - def publish_test_mqtt_message(self, topic: str, message: str): - self.mqtt_helper.publish_test_mqtt_message(topic, message) diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index 50025e37ab..a729cd0302 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -63,8 +63,6 @@ def get_image(self, container_engine): image = self.__build_http_proxy_image() elif container_engine == "postgresql-server": image = self.__build_postgresql_server_image() - elif container_engine == "mqtt-broker": - image = self.__build_mqtt_broker_image() else: raise Exception("There is no associated image for " + container_engine) @@ -277,15 +275,6 @@ def __build_postgresql_server_image(self): """.format(base_image='postgres:17.4')) return self.__build_image(dockerfile) - def __build_mqtt_broker_image(self): - dockerfile = dedent("""\ - FROM {base_image} - RUN echo 'log_dest stderr' >> /mosquitto-no-auth.conf - CMD ["/usr/sbin/mosquitto", "--verbose", "--config-file", "/mosquitto-no-auth.conf"] - """.format(base_image='eclipse-mosquitto:2.0.14')) - - return self.__build_image(dockerfile) - def __build_image(self, dockerfile, context_files=[]): conf_dockerfile_buffer = BytesIO() docker_context_buffer = BytesIO() diff --git a/docker/test/integration/cluster/containers/MqttBrokerContainer.py b/docker/test/integration/cluster/containers/MqttBrokerContainer.py deleted file mode 100644 index faa168f311..0000000000 --- a/docker/test/integration/cluster/containers/MqttBrokerContainer.py +++ /dev/null @@ -1,40 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import logging -from .Container import Container - - -class MqttBrokerContainer(Container): - def __init__(self, feature_context, name, vols, network, image_store, command=None): - super().__init__(feature_context, name, 'mqtt-broker', vols, network, image_store, command) - - def get_startup_finished_log_entry(self): - return "mosquitto version [0-9\\.]+ running" - - def deploy(self): - if not self.set_deployed(): - return - - logging.info('Creating and running MQTT broker docker container...') - self.client.containers.run( - self.image_store.get_image(self.get_engine()), - detach=True, - name=self.name, - ports={'1883/tcp': 1883}, - network=self.network.name, - entrypoint=self.command) - logging.info('Added container \'%s\'', self.name) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 7a36309cdd..a85c48bb74 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -389,6 +389,3 @@ def set_value_on_plc_with_modbus(self, container_name, modbus_cmd): def enable_ssl_in_nifi(self): self.cluster.enable_ssl_in_nifi() - - def publish_test_mqtt_message(self, topic, message): - self.cluster.publish_test_mqtt_message(topic, message) diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index ce222141b2..521679e6fd 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -121,7 +121,6 @@ def step_impl(context, processor_type, minifi_container_name): @given("a {processor_type} processor") @given("a {processor_type} processor set up to communicate with an Azure blob storage") -@given("a {processor_type} processor set up to communicate with an MQTT broker instance") def step_impl(context, processor_type): __create_processor(context, processor_type, processor_type, None, None, "minifi-cpp-flow") @@ -486,16 +485,6 @@ def step_impl(context, processor_name, service_property_name, property_name, pro __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, {property_name: property_value}) -# MQTT setup -@when("an MQTT broker is set up in correspondence with the PublishMQTT") -@given("an MQTT broker is set up in correspondence with the PublishMQTT") -@given("an MQTT broker is set up in correspondence with the ConsumeMQTT") -@given("an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT") -def step_impl(context): - context.test.acquire_container(context=context, name="mqtt-broker", engine="mqtt-broker") - context.test.start('mqtt-broker') - - # azure storage setup @given("an Azure storage server is set up") def step_impl(context): @@ -779,22 +768,6 @@ def step_impl(context, regex, duration): context.test.check_minifi_log_matches_regex(regex, humanfriendly.parse_timespan(duration)) -# MQTT -@then("the MQTT broker has a log line matching \"{log_pattern}\"") -def step_impl(context, log_pattern): - context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 60, count=1) - - -@then("the MQTT broker has {log_count} log lines matching \"{log_pattern}\"") -def step_impl(context, log_count, log_pattern): - context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 60, count=int(log_count)) - - -@when("a test message \"{message}\" is published to the MQTT broker on topic \"{topic}\"") -def step_impl(context, message, topic): - context.test.publish_test_mqtt_message(topic, message) - - @then("the \"{minifi_container_name}\" flow has a log line matching \"{log_pattern}\" in less than {duration}") def step_impl(context, minifi_container_name, log_pattern, duration): context.test.check_container_log_matches_regex(minifi_container_name, log_pattern, humanfriendly.parse_timespan(duration), count=1) diff --git a/docker/test/integration/minifi/processors/ConsumeMQTT.py b/docker/test/integration/minifi/processors/ConsumeMQTT.py deleted file mode 100644 index 8c64be6a3d..0000000000 --- a/docker/test/integration/minifi/processors/ConsumeMQTT.py +++ /dev/null @@ -1,30 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.Processor import Processor - - -class ConsumeMQTT(Processor): - def __init__(self, context, schedule={'scheduling strategy': 'TIMER_DRIVEN'}): - super(ConsumeMQTT, self).__init__( - context=context, - clazz='ConsumeMQTT', - properties={ - 'Broker URI': f'mqtt-broker-{context.feature_id}:1883', - 'Topic': 'testtopic', - 'Client ID': 'consumer-client'}, - auto_terminate=['success'], - schedule=schedule) diff --git a/docker/test/integration/minifi/processors/PublishMQTT.py b/docker/test/integration/minifi/processors/PublishMQTT.py deleted file mode 100644 index 41387ce761..0000000000 --- a/docker/test/integration/minifi/processors/PublishMQTT.py +++ /dev/null @@ -1,30 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.Processor import Processor - - -class PublishMQTT(Processor): - def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(PublishMQTT, self).__init__( - context=context, - clazz='PublishMQTT', - properties={ - 'Broker URI': f'mqtt-broker-{context.feature_id}:1883', - 'Topic': 'testtopic', - 'Client ID': 'publisher-client'}, - auto_terminate=['success', 'failure'], - schedule=schedule) diff --git a/extensions/couchbase/tests/features/steps/couchbase_server_container.py b/extensions/couchbase/tests/features/steps/couchbase_server_container.py index 96cddf821d..893ba9de9b 100644 --- a/extensions/couchbase/tests/features/steps/couchbase_server_container.py +++ b/extensions/couchbase/tests/features/steps/couchbase_server_container.py @@ -82,8 +82,8 @@ def _run_python_in_couchbase_helper_docker(self, command: str): self.client.containers.run("minifi-couchbase-helper:latest", ["python", "-c", command], remove=True, stdout=True, stderr=True, network=self.network.name) return True except ContainerError as e: - stdout = e.stdout.decode("utf-8", errors="replace") if e.stdout else "" - stderr = e.stderr.decode("utf-8", errors="replace") if e.stderr else "" + stdout = e.stdout.decode("utf-8", errors="replace") if hasattr(e, "stdout") and e.stdout else "" + stderr = e.stderr.decode("utf-8", errors="replace") if hasattr(e, "stderr") and e.stderr else "" logging.error(f"Python command '{command}' failed in couchbase helper docker with error: '{e}', stdout: '{stdout}', stderr: '{stderr}'") return False except Exception as e: diff --git a/extensions/grafana-loki/tests/features/steps/grafana_loki_container.py b/extensions/grafana-loki/tests/features/steps/grafana_loki_container.py index 46fc84b5af..3d7e497aa8 100644 --- a/extensions/grafana-loki/tests/features/steps/grafana_loki_container.py +++ b/extensions/grafana-loki/tests/features/steps/grafana_loki_container.py @@ -142,8 +142,8 @@ def are_lines_present(self, lines: str, timeout: int, ssl: bool, tenant_id: str remove=True, stdout=True, stderr=True, network=self.network.name) return True except ContainerError as e: - stdout = e.stdout.decode("utf-8", errors="replace") if e.stdout else "" - stderr = e.stderr.decode("utf-8", errors="replace") if e.stderr else "" + stdout = e.stdout.decode("utf-8", errors="replace") if hasattr(e, "stdout") and e.stdout else "" + stderr = e.stderr.decode("utf-8", errors="replace") if hasattr(e, "stderr") and e.stderr else "" logging.error(f"Failed to run python command in grafana loki helper docker with error: '{e}', stdout: '{stdout}', stderr: '{stderr}'") return False except Exception as e: diff --git a/extensions/mqtt/tests/features/environment.py b/extensions/mqtt/tests/features/environment.py new file mode 100644 index 0000000000..b3bbe9df02 --- /dev/null +++ b/extensions/mqtt/tests/features/environment.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from minifi_test_framework.containers.docker_image_builder import DockerImageBuilder +from minifi_test_framework.core.hooks import common_before_scenario +from minifi_test_framework.core.hooks import common_after_scenario + + +def before_all(context): + check_log_lines_path = Path(__file__).resolve().parent / "resources" / "publish_mqtt_message.py" + check_log_lines_content = None + with open(check_log_lines_path, "rb") as f: + check_log_lines_content = f.read() + dockerfile = """ +FROM python:3.13-slim-bookworm +RUN pip install paho-mqtt==2.1.0 +COPY publish_mqtt_message.py /scripts/publish_mqtt_message.py""" + mqtt_helper_builder = DockerImageBuilder( + image_tag="minifi-mqtt-helper:latest", + dockerfile_content=dockerfile, + files_on_context={"publish_mqtt_message.py": check_log_lines_content} + ) + mqtt_helper_builder.build() + + +def before_scenario(context, scenario): + common_before_scenario(context, scenario) + + +def after_scenario(context, scenario): + common_after_scenario(context, scenario) diff --git a/docker/test/integration/features/mqtt.feature b/extensions/mqtt/tests/features/mqtt.feature similarity index 60% rename from docker/test/integration/features/mqtt.feature rename to extensions/mqtt/tests/features/mqtt.feature index 64ef62f6b4..3fcc0d59b1 100644 --- a/docker/test/integration/features/mqtt.feature +++ b/extensions/mqtt/tests/features/mqtt.feature @@ -19,22 +19,21 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT As a user of MiNiFi I need to have PublishMQTT and ConsumeMQTT processors - Background: - Given the content of "/tmp/output" is monitored - Scenario Outline: A MiNiFi instance transfers data to an MQTT broker Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with the content "test" is present in "/tmp/input" And a PublishMQTT processor set up to communicate with an MQTT broker instance And the "MQTT Version" property of the PublishMQTT processor is set to "" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the "success" relationship of the GetFile processor is connected to the PublishMQTT And the "success" relationship of the PublishMQTT processor is connected to the PutFile + And PutFile's success relationship is auto-terminated - And an MQTT broker is set up in correspondence with the PublishMQTT + And an MQTT broker is started - When both instances start up - Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + When the MiNiFi instance starts up + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)" Examples: MQTT versions @@ -48,12 +47,14 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT And a PublishMQTT processor set up to communicate with an MQTT broker instance And the "MQTT Version" property of the PublishMQTT processor is set to "" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the "success" relationship of the GetFile processor is connected to the PublishMQTT And the "success" relationship of the PublishMQTT processor is connected to the PutFile And the "failure" relationship of the PublishMQTT processor is connected to the PutFile + And PutFile's success relationship is auto-terminated When the MiNiFi instance starts up - Then no files are placed in the monitored directory in 30 seconds of running time + Then no files are placed in the "/tmp/output" directory in 30 seconds of running time Examples: MQTT versions | version | @@ -66,14 +67,16 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT And a PublishMQTT processor set up to communicate with an MQTT broker instance And the "MQTT Version" property of the PublishMQTT processor is set to "" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the "success" relationship of the GetFile processor is connected to the PublishMQTT And the "success" relationship of the PublishMQTT processor is connected to the PutFile + And PutFile's success relationship is auto-terminated When the MiNiFi instance starts up - Then no files are placed in the monitored directory in 10 seconds of running time + Then no files are placed in the "/tmp/output" directory in 10 seconds of running time - When an MQTT broker is set up in correspondence with the PublishMQTT - Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + When an MQTT broker is started + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)" Examples: MQTT versions @@ -84,24 +87,28 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Scenario Outline: A MiNiFi instance publishes and consumes data to/from an MQTT broker Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a PublishMQTT processor set up to communicate with an MQTT broker instance + And PublishMQTT is EVENT_DRIVEN And the "MQTT Version" property of the PublishMQTT processor is set to "" And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated And a ConsumeMQTT processor set up to communicate with an MQTT broker instance And the "MQTT Version" property of the ConsumeMQTT processor is set to "" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor - And "ConsumeMQTT" processor is a start node + And LogAttribute is EVENT_DRIVEN And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started - When both instances start up + When the MiNiFi instance starts up Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" And the MQTT broker has a log line matching "New client connected from .* as publisher-client" - And a file with the content "test" is placed in "/tmp/input" - And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + When a file with the content "test" is placed in "/tmp/input" + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)" And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 60 seconds And the Minifi logs contain the following message: "key:mqtt.topic value:testtopic" in less than 1 seconds @@ -120,24 +127,27 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Scenario Outline: Subscription to topics with wildcards Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a PublishMQTT processor set up to communicate with an MQTT broker instance + And PublishMQTT is EVENT_DRIVEN And the "Topic" property of the PublishMQTT processor is set to "test/my/topic" And the "MQTT Version" property of the PublishMQTT processor is set to "" And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated And a ConsumeMQTT processor set up to communicate with an MQTT broker instance And the "Topic" property of the ConsumeMQTT processor is set to "" And the "MQTT Version" property of the ConsumeMQTT processor is set to "" And a PutFile processor with the "Directory" property set to "/tmp/output" - And "ConsumeMQTT" processor is a start node + And PutFile is EVENT_DRIVEN And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile's success relationship is auto-terminated - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started - When both instances start up + When the MiNiFi instance starts up Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" And the MQTT broker has a log line matching "New client connected from .* as publisher-client" - And a file with the content "test" is placed in "/tmp/input" - And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + When a file with the content "test" is placed in "/tmp/input" + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from .*test/my/topic.*\(4 bytes\)" Examples: Topic wildcard patterns @@ -150,33 +160,37 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Scenario Outline: Subscription and publishing with disconnecting clients in persistent sessions # publishing MQTT client Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow - And a PublishMQTT processor in the "publisher-client" flow - And the "MQTT Version" property of the PublishMQTT processor is set to "" - And the "Quality of Service" property of the PublishMQTT processor is set to "1" - And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And a PublishMQTT processor set up to communicate with an MQTT broker instance in the "publisher-client" flow + And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow + And the "MQTT Version" property of the PublishMQTT processor is set to "" in the "publisher-client" flow + And the "Quality of Service" property of the PublishMQTT processor is set to "1" in the "publisher-client" flow + And in the "publisher-client" flow the "success" relationship of the GetFile processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated in the "publisher-client" flow # consuming MQTT client - And a ConsumeMQTT processor in the "consumer-client" flow - And the "MQTT Version" property of the ConsumeMQTT processor is set to "" - And the "Quality of Service" property of the ConsumeMQTT processor is set to "1" - And the "" property of the ConsumeMQTT processor is set to "" - And the "" property of the ConsumeMQTT processor is set to "" + And a ConsumeMQTT processor set up to communicate with an MQTT broker instance in the "consumer-client" flow + And the "MQTT Version" property of the ConsumeMQTT processor is set to "" in the "consumer-client" flow + And the "Quality of Service" property of the ConsumeMQTT processor is set to "1" in the "consumer-client" flow + And the "" property of the ConsumeMQTT processor is set to "" in the "consumer-client" flow + And the "" property of the ConsumeMQTT processor is set to "" in the "consumer-client" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow - And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "consumer-client" flow + And in the "consumer-client" flow the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "consumer-client" flow - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started When all instances start up Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" - And "consumer-client" flow is stopped - And the MQTT broker has a log line matching "Received DISCONNECT from consumer-client" + When "consumer-client" flow is stopped + Then the MQTT broker has a log line matching "Received DISCONNECT from consumer-client" - And a file with the content "test" is placed in "/tmp/input" - And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)" + When a file with the content "test" is placed in "/tmp/input" in the "publisher-client" flow + Then the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)" - And "consumer-client" flow is restarted - And the MQTT broker has 2 log lines matching "New client connected from .* as consumer-client" - And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + When "consumer-client" flow is restarted + Then the MQTT broker has 2 log lines matching "New client connected from .* as consumer-client" + And in the "consumer-client" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds Examples: MQTT versions | version | persistent_session_property_1 | persistent_session_property_1_value | persistent_session_property_2 | persistent_session_property_2_value | @@ -186,24 +200,27 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Scenario Outline: UTF-8 topics and messages Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a PublishMQTT processor set up to communicate with an MQTT broker instance + And PublishMQTT is EVENT_DRIVEN And the "MQTT Version" property of the PublishMQTT processor is set to "" And the "Topic" property of the PublishMQTT processor is set to "" And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated And a ConsumeMQTT processor set up to communicate with an MQTT broker instance And the "MQTT Version" property of the ConsumeMQTT processor is set to "" And the "Topic" property of the ConsumeMQTT processor is set to "" And a PutFile processor with the "Directory" property set to "/tmp/output" - And "ConsumeMQTT" processor is a start node + And PutFile is EVENT_DRIVEN And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile's success relationship is auto-terminated - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started - When both instances start up + When the MiNiFi instance starts up Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" And the MQTT broker has a log line matching "New client connected from .* as publisher-client" - And a file with the content "" is placed in "/tmp/input" - And a flowfile with the content "" is placed in the monitored directory in less than 60 seconds + When a file with the content "" is placed in "/tmp/input" + Then a single file with the content "" is placed in the "/tmp/output" directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from .*" Examples: Topics, messages and version @@ -214,28 +231,30 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT | محمد بن موسی خوارزمی | ٱلْجَبْر | 3.x AUTO | | תַּלְמוּד | תּוֹרָה | 3.x AUTO | - Scenario Outline: QoS 0 message flow is correct Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a PublishMQTT processor set up to communicate with an MQTT broker instance + And PublishMQTT is EVENT_DRIVEN And the "MQTT Version" property of the PublishMQTT processor is set to "" And the "Quality of Service" property of the PublishMQTT processor is set to "0" And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated And a ConsumeMQTT processor set up to communicate with an MQTT broker instance And the "MQTT Version" property of the ConsumeMQTT processor is set to "" And the "Quality of Service" property of the ConsumeMQTT processor is set to "0" And a PutFile processor with the "Directory" property set to "/tmp/output" - And "ConsumeMQTT" processor is a start node + And PutFile is EVENT_DRIVEN And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile's success relationship is auto-terminated - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started - When both instances start up + When the MiNiFi instance starts up Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" And the MQTT broker has a log line matching "New client connected from .* as publisher-client" - And a file with the content "test" is placed in "/tmp/input" - And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + When a file with the content "test" is placed in "/tmp/input" + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from publisher-client \(d0, q0, r0, m0, 'testtopic'.*\(4 bytes\)" And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q0, r0, m0, 'testtopic',.*\(4 bytes\)\)" @@ -247,24 +266,27 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Scenario Outline: QoS 1 Subscriber sends PUBACK on a PUBLISH message, with correct packet ID Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a PublishMQTT processor set up to communicate with an MQTT broker instance + And PublishMQTT is EVENT_DRIVEN And the "MQTT Version" property of the PublishMQTT processor is set to "" And the "Quality of Service" property of the PublishMQTT processor is set to "1" And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated And a ConsumeMQTT processor set up to communicate with an MQTT broker instance And the "MQTT Version" property of the ConsumeMQTT processor is set to "" And the "Quality of Service" property of the ConsumeMQTT processor is set to "1" And a PutFile processor with the "Directory" property set to "/tmp/output" - And "ConsumeMQTT" processor is a start node + And PutFile is EVENT_DRIVEN And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile's success relationship is auto-terminated - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started - When both instances start up + When the MiNiFi instance starts up Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" And the MQTT broker has a log line matching "New client connected from .* as publisher-client" - And a file with the content "test" is placed in "/tmp/input" - And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + When a file with the content "test" is placed in "/tmp/input" + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from publisher-client.*m1, 'testtopic'.*\(4 bytes\)" And the MQTT broker has a log line matching "Sending PUBACK to publisher-client \(m1, rc0\)" And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q1, r0, m1, 'testtopic',.*\(4 bytes\)\)" @@ -278,24 +300,27 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Scenario Outline: QoS 2 message flow is correct Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a PublishMQTT processor set up to communicate with an MQTT broker instance + And PublishMQTT is EVENT_DRIVEN And the "MQTT Version" property of the PublishMQTT processor is set to "" And the "Quality of Service" property of the PublishMQTT processor is set to "2" And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated And a ConsumeMQTT processor set up to communicate with an MQTT broker instance And the "MQTT Version" property of the ConsumeMQTT processor is set to "" And the "Quality of Service" property of the ConsumeMQTT processor is set to "2" And a PutFile processor with the "Directory" property set to "/tmp/output" - And "ConsumeMQTT" processor is a start node + And PutFile is EVENT_DRIVEN And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile's success relationship is auto-terminated - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started - When both instances start up + When the MiNiFi instance starts up Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" And the MQTT broker has a log line matching "New client connected from .* as publisher-client" - And a file with the content "test" is placed in "/tmp/input" - And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + When a file with the content "test" is placed in "/tmp/input" + Then a single file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from publisher-client.*m1, 'testtopic'.*\(4 bytes\)" And the MQTT broker has a log line matching "Sending PUBREC to publisher-client \(m1, rc0\)" And the MQTT broker has a log line matching "Received PUBREL from publisher-client \(Mid: 1\)" @@ -313,28 +338,33 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Scenario Outline: Retained message # publishing MQTT client Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow - And a file with the content "test" is present in "/tmp/input" - And a PublishMQTT processor in the "publisher-client" flow - And the "MQTT Version" property of the PublishMQTT processor is set to "" - And the "Retain" property of the PublishMQTT processor is set to "true" - And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And the scheduling period of the GetFile processor is set to "120 seconds" in the "publisher-client" flow + And a file with the content "test" is present in "/tmp/input" in the "publisher-client" flow + And a PublishMQTT processor set up to communicate with an MQTT broker instance in the "publisher-client" flow + And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow + And the "MQTT Version" property of the PublishMQTT processor is set to "" in the "publisher-client" flow + And the "Retain" property of the PublishMQTT processor is set to "true" in the "publisher-client" flow + And in the "publisher-client" flow the "success" relationship of the GetFile processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated in the "publisher-client" flow # consuming MQTT client - And a ConsumeMQTT processor in the "consumer-client" flow - And the "MQTT Version" property of the ConsumeMQTT processor is set to "" + And a ConsumeMQTT processor set up to communicate with an MQTT broker instance in the "consumer-client" flow + And the "MQTT Version" property of the ConsumeMQTT processor is set to "" in the "consumer-client" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow - And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "consumer-client" flow + And in the "consumer-client" flow the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "consumer-client" flow - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started - When both instances start up + When all instances start up Then the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)" # consumer is joining late, but it will still see the retained message - And "consumer-client" flow is started - And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" + When "consumer-client" flow is started + Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" - And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + And in the "consumer-client" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds Examples: MQTT versions | version | @@ -344,27 +374,31 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Scenario Outline: Last will # publishing MQTT client with last will Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow - And a PublishMQTT processor in the "publisher-client" flow - And the "MQTT Version" property of the PublishMQTT processor is set to "" - And the "Last Will Topic" property of the PublishMQTT processor is set to "last_will_topic" - And the "Last Will Message" property of the PublishMQTT processor is set to "last_will_message" - And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And a PublishMQTT processor set up to communicate with an MQTT broker instance in the "publisher-client" flow + And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow + And the "MQTT Version" property of the PublishMQTT processor is set to "" in the "publisher-client" flow + And the "Last Will Topic" property of the PublishMQTT processor is set to "last_will_topic" in the "publisher-client" flow + And the "Last Will Message" property of the PublishMQTT processor is set to "last_will_message" in the "publisher-client" flow + And in the "publisher-client" flow the "success" relationship of the GetFile processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated in the "publisher-client" flow # consuming MQTT client set to consume last will topic - And a ConsumeMQTT processor in the "consumer-client" flow - And the "MQTT Version" property of the ConsumeMQTT processor is set to "" - And the "Topic" property of the ConsumeMQTT processor is set to "last_will_topic" + And a ConsumeMQTT processor set up to communicate with an MQTT broker instance in the "consumer-client" flow + And the "MQTT Version" property of the ConsumeMQTT processor is set to "" in the "consumer-client" flow + And the "Topic" property of the ConsumeMQTT processor is set to "last_will_topic" in the "consumer-client" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow - And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "consumer-client" flow + And in the "consumer-client" flow the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "consumer-client" flow - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started When all instances start up Then the MQTT broker has a log line matching "Sending CONNACK to publisher-client" And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" - And "publisher-client" flow is killed - And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client" - And a flowfile with the content "last_will_message" is placed in the monitored directory in less than 60 seconds + When "publisher-client" flow is killed + Then the MQTT broker has a log line matching "Sending PUBLISH to consumer-client" + And in the "consumer-client" container a single file with the content "last_will_message" is placed in the "/tmp/output" directory in less than 60 seconds Examples: MQTT versions | version | @@ -376,9 +410,9 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT And the "MQTT Version" property of the ConsumeMQTT processor is set to "" And the "Keep Alive Interval" property of the ConsumeMQTT processor is set to "1 sec" - And an MQTT broker is set up in correspondence with the ConsumeMQTT + And an MQTT broker is started - When both instances start up + When the MiNiFi instance starts up Then the MQTT broker has a log line matching "Received PINGREQ from consumer-client" Then the MQTT broker has a log line matching "Sending PINGRESP to consumer-client" @@ -390,171 +424,201 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Scenario Outline: Message Expiry Interval - MQTT 5 # publishing MQTT client Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow - And a PublishMQTT processor in the "publisher-client" flow - And the "MQTT Version" property of the PublishMQTT processor is set to "5.0" - And the "Message Expiry Interval" property of the PublishMQTT processor is set to "" - And the "Quality of Service" property of the PublishMQTT processor is set to "1" - And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And a PublishMQTT processor set up to communicate with an MQTT broker instance in the "publisher-client" flow + And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow + And the "MQTT Version" property of the PublishMQTT processor is set to "5.0" in the "publisher-client" flow + And the "Message Expiry Interval" property of the PublishMQTT processor is set to "" in the "publisher-client" flow + And the "Quality of Service" property of the PublishMQTT processor is set to "1" in the "publisher-client" flow + And in the "publisher-client" flow the "success" relationship of the GetFile processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated in the "publisher-client" flow # consuming MQTT client - And a ConsumeMQTT processor in the "consumer-client" flow - And the "MQTT Version" property of the ConsumeMQTT processor is set to "5.0" - And the "Quality of Service" property of the ConsumeMQTT processor is set to "1" - And the "Clean Start" property of the ConsumeMQTT processor is set to "false" - And the "Session Expiry Interval" property of the ConsumeMQTT processor is set to "1 h" + And a ConsumeMQTT processor set up to communicate with an MQTT broker instance in the "consumer-client" flow + And the "MQTT Version" property of the ConsumeMQTT processor is set to "5.0" in the "consumer-client" flow + And the "Quality of Service" property of the ConsumeMQTT processor is set to "1" in the "consumer-client" flow + And the "Clean Start" property of the ConsumeMQTT processor is set to "false" in the "consumer-client" flow + And the "Session Expiry Interval" property of the ConsumeMQTT processor is set to "1 h" in the "consumer-client" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow - And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "consumer-client" flow + And in the "consumer-client" flow the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "consumer-client" flow - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started When all instances start up Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" - And "consumer-client" flow is stopped - And the MQTT broker has a log line matching "Received DISCONNECT from consumer-client" + When "consumer-client" flow is stopped + Then the MQTT broker has a log line matching "Received DISCONNECT from consumer-client" - And a file with the content "test" is placed in "/tmp/input" - And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)" + When a file with the content "test" is placed in "/tmp/input" in the "publisher-client" flow + Then the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)" And 2 seconds later - And "consumer-client" flow is restarted - And the MQTT broker has 2 log lines matching "New client connected from .* as consumer-client" - And placed in the monitored directory in + When "consumer-client" flow is restarted + Then the MQTT broker has 2 log lines matching "New client connected from .* as consumer-client" + And in the "consumer-client" container placed in the "/tmp/output" directory in Examples: Message Expiry Intervals - | message_expiry_interval | expectation_num_files | expectation_time_limit | - | 1 h | a flowfile with the content "test" is | less than 60 seconds | - | 1 s | no files are | 30 seconds of running time | + | message_expiry_interval | expectation_num_files | expectation_time_limit | + | 1 h | a single file with the content "test" is | less than 60 seconds | + | 1 s | no files are | 30 seconds of running time | Scenario: User properties - MQTT 5 # publishing MQTT client: GetFile -> UpdateAttribute (my_attr1:true, my_attr2:true) -> PublishMQTT Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow And a UpdateAttribute processor in the "publisher-client" flow - And the "my_attr1" property of the UpdateAttribute processor is set to "true" - And the "my_attr2" property of the UpdateAttribute processor is set to "true" - And a PublishMQTT processor in the "publisher-client" flow - And the "MQTT Version" property of the PublishMQTT processor is set to "5.0" - And the "success" relationship of the GetFile processor is connected to the UpdateAttribute - And the "success" relationship of the UpdateAttribute processor is connected to the PublishMQTT + And the "my_attr1" property of the UpdateAttribute processor is set to "true" in the "publisher-client" flow + And the "my_attr2" property of the UpdateAttribute processor is set to "true" in the "publisher-client" flow + And a PublishMQTT processor set up to communicate with an MQTT broker instance in the "publisher-client" flow + And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow + And the "MQTT Version" property of the PublishMQTT processor is set to "5.0" in the "publisher-client" flow + And in the "publisher-client" flow the "success" relationship of the GetFile processor is connected to the UpdateAttribute + And in the "publisher-client" flow the "success" relationship of the UpdateAttribute processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated in the "publisher-client" flow # consuming MQTT client: ConsumeMQTT -> RouteOnAttribute (my_attr1) -> RouteOnAttribute (my_attr2) -> PutFile - And a ConsumeMQTT processor in the "consumer-client" flow - And the "MQTT Version" property of the ConsumeMQTT processor is set to "5.0" + And a ConsumeMQTT processor set up to communicate with an MQTT broker instance in the "consumer-client" flow + And the "MQTT Version" property of the ConsumeMQTT processor is set to "5.0" in the "consumer-client" flow And a RouteOnAttribute processor with the name "RouteAttr1" in the "consumer-client" flow - And the "matched_my_attr1" property of the RouteAttr1 processor is set to "${my_attr1}" + And RouteAttr1 is EVENT_DRIVEN in the "consumer-client" flow + And the "matched_my_attr1" property of the RouteAttr1 processor is set to "${my_attr1}" in the "consumer-client" flow And a RouteOnAttribute processor with the name "RouteAttr2" in the "consumer-client" flow - And the "matched_my_attr2" property of the RouteAttr2 processor is set to "${my_attr2}" + And RouteAttr2 is EVENT_DRIVEN in the "consumer-client" flow + And the "matched_my_attr2" property of the RouteAttr2 processor is set to "${my_attr2}" in the "consumer-client" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow - And the "success" relationship of the ConsumeMQTT processor is connected to the RouteAttr1 - And the "matched_my_attr1" relationship of the RouteAttr1 processor is connected to the RouteAttr2 - And the "matched_my_attr2" relationship of the RouteAttr2 processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "consumer-client" flow + And in the "consumer-client" flow the "success" relationship of the ConsumeMQTT processor is connected to the RouteAttr1 + And in the "consumer-client" flow the "matched_my_attr1" relationship of the RouteAttr1 processor is connected to the RouteAttr2 + And in the "consumer-client" flow the "matched_my_attr2" relationship of the RouteAttr2 processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "consumer-client" flow - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started - When both instances start up + When all instances start up Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" And the MQTT broker has a log line matching "New client connected from .* as publisher-client" - And a file with the content "test" is placed in "/tmp/input" - And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + When a file with the content "test" is placed in "/tmp/input" in the "publisher-client" flow + Then in the "consumer-client" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)" Scenario: Content type - MQTT 5 # publishing MQTT client: GetFile -> PublishMQTT (Content Type: text/plain) Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow - And a PublishMQTT processor in the "publisher-client" flow - And the "MQTT Version" property of the PublishMQTT processor is set to "5.0" - And the "Content Type" property of the PublishMQTT processor is set to "text/plain" - And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And a PublishMQTT processor set up to communicate with an MQTT broker instance in the "publisher-client" flow + And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow + And the "MQTT Version" property of the PublishMQTT processor is set to "5.0" in the "publisher-client" flow + And the "Content Type" property of the PublishMQTT processor is set to "text/plain" in the "publisher-client" flow + And in the "publisher-client" flow the "success" relationship of the GetFile processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated in the "publisher-client" flow # consuming MQTT client: ConsumeMQTT (Attribute From Content Type: content_type) -> RouteOnAttribute (content_type = text/plain) -> PutFile - And a ConsumeMQTT processor in the "consumer-client" flow - And the "MQTT Version" property of the ConsumeMQTT processor is set to "5.0" - And the "Attribute From Content Type" property of the ConsumeMQTT processor is set to "content_type" + And a ConsumeMQTT processor set up to communicate with an MQTT broker instance in the "consumer-client" flow + And the "MQTT Version" property of the ConsumeMQTT processor is set to "5.0" in the "consumer-client" flow + And the "Attribute From Content Type" property of the ConsumeMQTT processor is set to "content_type" in the "consumer-client" flow And a RouteOnAttribute processor in the "consumer-client" flow - And the "matched_content_type" property of the RouteOnAttribute processor is set to "${content_type:equals('text/plain')}" + And RouteOnAttribute is EVENT_DRIVEN in the "consumer-client" flow + And the "matched_content_type" property of the RouteOnAttribute processor is set to "${content_type:equals('text/plain')}" in the "consumer-client" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow - And the "success" relationship of the ConsumeMQTT processor is connected to the RouteOnAttribute - And the "matched_content_type" relationship of the RouteOnAttribute processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "consumer-client" flow + And in the "consumer-client" flow the "success" relationship of the ConsumeMQTT processor is connected to the RouteOnAttribute + And in the "consumer-client" flow the "matched_content_type" relationship of the RouteOnAttribute processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "consumer-client" flow - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started - When both instances start up + When all instances start up Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" And the MQTT broker has a log line matching "New client connected from .* as publisher-client" - And a file with the content "test" is placed in "/tmp/input" - And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + When a file with the content "test" is placed in "/tmp/input" in the "publisher-client" flow + Then in the "consumer-client" container a single file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)" Scenario: Will content type - MQTT 5 # publishing MQTT client: GetFile -> PublishMQTT (Last Will Content Type: text/plain) Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow - And a PublishMQTT processor in the "publisher-client" flow - And the "MQTT Version" property of the PublishMQTT processor is set to "5.0" - And the "Last Will Topic" property of the PublishMQTT processor is set to "last_will_topic" - And the "Last Will Message" property of the PublishMQTT processor is set to "last_will_message" - And the "Last Will Content Type" property of the PublishMQTT processor is set to "text/plain" - And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And a PublishMQTT processor set up to communicate with an MQTT broker instance in the "publisher-client" flow + And PublishMQTT is EVENT_DRIVEN in the "publisher-client" flow + And the "MQTT Version" property of the PublishMQTT processor is set to "5.0" in the "publisher-client" flow + And the "Last Will Topic" property of the PublishMQTT processor is set to "last_will_topic" in the "publisher-client" flow + And the "Last Will Message" property of the PublishMQTT processor is set to "last_will_message" in the "publisher-client" flow + And the "Last Will Content Type" property of the PublishMQTT processor is set to "text/plain" in the "publisher-client" flow + And in the "publisher-client" flow the "success" relationship of the GetFile processor is connected to the PublishMQTT + And PublishMQTT's success relationship is auto-terminated in the "publisher-client" flow # consuming MQTT client: ConsumeMQTT (Attribute From Content Type: content_type, Topic: last_will_topic) -> RouteOnAttribute (content_type = text/plain) -> PutFile - And a ConsumeMQTT processor in the "consumer-client" flow - And the "MQTT Version" property of the ConsumeMQTT processor is set to "5.0" - And the "Topic" property of the ConsumeMQTT processor is set to "last_will_topic" - And the "Attribute From Content Type" property of the ConsumeMQTT processor is set to "content_type" + And a ConsumeMQTT processor set up to communicate with an MQTT broker instance in the "consumer-client" flow + And the "MQTT Version" property of the ConsumeMQTT processor is set to "5.0" in the "consumer-client" flow + And the "Topic" property of the ConsumeMQTT processor is set to "last_will_topic" in the "consumer-client" flow + And the "Attribute From Content Type" property of the ConsumeMQTT processor is set to "content_type" in the "consumer-client" flow And a RouteOnAttribute processor in the "consumer-client" flow - And the "matched_content_type" property of the RouteOnAttribute processor is set to "${content_type:equals('text/plain')}" + And RouteOnAttribute is EVENT_DRIVEN in the "consumer-client" flow + And the "matched_content_type" property of the RouteOnAttribute processor is set to "${content_type:equals('text/plain')}" in the "consumer-client" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow - And the "success" relationship of the ConsumeMQTT processor is connected to the RouteOnAttribute - And the "matched_content_type" relationship of the RouteOnAttribute processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "consumer-client" flow + And in the "consumer-client" flow the "success" relationship of the ConsumeMQTT processor is connected to the RouteOnAttribute + And in the "consumer-client" flow the "matched_content_type" relationship of the RouteOnAttribute processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "consumer-client" flow - And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + And an MQTT broker is started When all instances start up Then the MQTT broker has a log line matching "Sending CONNACK to publisher-client" And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" - And "publisher-client" flow is killed - And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client" - And a flowfile with the content "last_will_message" is placed in the monitored directory in less than 60 seconds + When "publisher-client" flow is killed + Then the MQTT broker has a log line matching "Sending PUBLISH to consumer-client" + And in the "consumer-client" container a single file with the content "last_will_message" is placed in the "/tmp/output" directory in less than 60 seconds Scenario: A MiNiFi instance uses record reader and writer to convert consumed message from an MQTT broker Given a XMLReader controller service is set up - And a JsonRecordSetWriter controller service is set up with "Array" output grouping - And a ConsumeMQTT processor with the "Topic" property set to "test/my/topic" + And a JsonRecordSetWriter controller service is set up and the "Output Grouping" property set to "Array" + And a ConsumeMQTT processor set up to communicate with an MQTT broker instance + And the "Topic" property of the ConsumeMQTT processor is set to "test/my/topic" And the "MQTT Version" property of the ConsumeMQTT processor is set to "3.x AUTO" And the "Record Reader" property of the ConsumeMQTT processor is set to "XMLReader" And the "Record Writer" property of the ConsumeMQTT processor is set to "JsonRecordSetWriter" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor + And LogAttribute is EVENT_DRIVEN And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute - And an MQTT broker is set up in correspondence with the ConsumeMQTT + And PutFile's success relationship is auto-terminated + + And an MQTT broker is started - When both instances start up + When the MiNiFi instance starts up And a test message "test" is published to the MQTT broker on topic "test/my/topic" Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" - And a flowfile with the JSON content '[{"_isRetained": false, "_isDuplicate": false, "_qos": 0, "_topicSegments": ["test", "my", "topic"], "_topic": "test/my/topic", "element": "test"}]' is placed in the monitored directory in less than 60 seconds + And a file with the JSON content '[{"_isRetained": false, "_isDuplicate": false, "_qos": 0, "_topicSegments": ["test", "my", "topic"], "_topic": "test/my/topic", "element": "test"}]' is placed in the '/tmp/output' directory in less than 60 seconds And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds Scenario: A MiNiFi instance uses record reader and writer to convert and publish records to an MQTT broker Given a JsonTreeReader controller service is set up And a XMLRecordSetWriter controller service is set up - And the "Name of Record Tag" property of the XMLRecordSetWriter controller is set to "record" - And the "Name of Root Tag" property of the XMLRecordSetWriter controller is set to "root" + And the "Name of Record Tag" property of the XMLRecordSetWriter controller service is set to "record" + And the "Name of Root Tag" property of the XMLRecordSetWriter controller service is set to "root" And a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with the content '[{"string": "test"}, {"int": 42}]' is present in '/tmp/input' And a PublishMQTT processor set up to communicate with an MQTT broker instance + And PublishMQTT is EVENT_DRIVEN And the "MQTT Version" property of the PublishMQTT processor is set to "3.x AUTO" And the "Record Reader" property of the PublishMQTT processor is set to "JsonTreeReader" And the "Record Writer" property of the PublishMQTT processor is set to "XMLRecordSetWriter" And a UpdateAttribute processor with the "filename" property set to "${UUID()}.xml" + And UpdateAttribute is EVENT_DRIVEN And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the "success" relationship of the GetFile processor is connected to the PublishMQTT And the "success" relationship of the PublishMQTT processor is connected to the UpdateAttribute And the "success" relationship of the UpdateAttribute processor is connected to the PutFile - And an MQTT broker is set up in correspondence with the PublishMQTT + And PutFile's success relationship is auto-terminated - When both instances start up + And an MQTT broker is started + + When the MiNiFi instance starts up - Then two flowfiles with the contents 'test' and '42' are placed in the monitored directory in less than 60 seconds + Then files with contents 'test' and '42' are placed in the '/tmp/output' directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(72 bytes\)" And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(64 bytes\)" diff --git a/docker/test/integration/cluster/checkers/MqttHelper.py b/extensions/mqtt/tests/features/resources/publish_mqtt_message.py similarity index 62% rename from docker/test/integration/cluster/checkers/MqttHelper.py rename to extensions/mqtt/tests/features/resources/publish_mqtt_message.py index 719911c970..6fe709acf5 100644 --- a/docker/test/integration/cluster/checkers/MqttHelper.py +++ b/extensions/mqtt/tests/features/resources/publish_mqtt_message.py @@ -13,11 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. import paho.mqtt.client as mqtt +import sys -class MqttHelper: - def publish_test_mqtt_message(self, topic: str, message: str): - client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "docker_test_client_id") - client.connect("localhost", 1883, 60) - client.publish(topic, message) - client.disconnect() +def publish_test_mqtt_message(host: str, topic: str, message: str): + client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "docker_test_client_id") + client.connect(host, 1883, 60) + client.publish(topic, message) + client.disconnect() + + +if __name__ == "__main__": + if sys.argv.__len__() != 4: + print("Usage: publish_mqtt_message.py ") + sys.exit(1) + + publish_test_mqtt_message(sys.argv[1], sys.argv[2], sys.argv[3]) diff --git a/extensions/mqtt/tests/features/steps/mqtt_broker_container.py b/extensions/mqtt/tests/features/steps/mqtt_broker_container.py new file mode 100644 index 0000000000..b5b739dce3 --- /dev/null +++ b/extensions/mqtt/tests/features/steps/mqtt_broker_container.py @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re +from textwrap import dedent + +from minifi_test_framework.containers.container import Container +from minifi_test_framework.core.helpers import wait_for_condition +from minifi_test_framework.core.minifi_test_context import MinifiTestContext +from minifi_test_framework.containers.docker_image_builder import DockerImageBuilder +from docker.errors import ContainerError + + +class MqttBrokerContainer(Container): + def __init__(self, test_context: MinifiTestContext): + dockerfile = dedent("""\ + FROM {base_image} + RUN echo 'log_dest stderr' >> /mosquitto-no-auth.conf + CMD ["/usr/sbin/mosquitto", "--verbose", "--config-file", "/mosquitto-no-auth.conf"] + """.format(base_image='eclipse-mosquitto:2.0.14')) + + builder = DockerImageBuilder( + image_tag="minifi-mqtt-broker:latest", + dockerfile_content=dockerfile + ) + builder.build() + + super().__init__("minifi-mqtt-broker:latest", f"mqtt-broker-{test_context.scenario_id}", test_context.network) + + def deploy(self): + super().deploy() + finished_str = "mosquitto version [0-9\\.]+ running" + return wait_for_condition( + condition=lambda: re.search(finished_str, self.get_logs()), + timeout_seconds=60, + bail_condition=lambda: self.exited, + context=None) + + def publish_mqtt_message(self, topic: str, message: str): + try: + self.client.containers.run("minifi-mqtt-helper:latest", ["python", "/scripts/publish_mqtt_message.py", self.container_name, topic, message], remove=True, stdout=True, stderr=True, network=self.network.name) + return True + except ContainerError as e: + stdout = e.stdout.decode("utf-8", errors="replace") if hasattr(e, "stdout") and e.stdout else "" + stderr = e.stderr.decode("utf-8", errors="replace") if hasattr(e, "stderr") and e.stderr else "" + logging.error(f"Failed to publish mqtt message in mqtt helper docker with error: '{e}', stdout: '{stdout}', stderr: '{stderr}'") + return False + except Exception as e: + logging.error(f"Unexpected error while publishing mqtt message in mqtt helper docker: '{e}'") + return False diff --git a/extensions/mqtt/tests/features/steps/steps.py b/extensions/mqtt/tests/features/steps/steps.py new file mode 100644 index 0000000000..4bff1b2a17 --- /dev/null +++ b/extensions/mqtt/tests/features/steps/steps.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re +from behave import given, step, then, when + +from minifi_test_framework.steps import checking_steps # noqa: F401 +from minifi_test_framework.steps import configuration_steps # noqa: F401 +from minifi_test_framework.steps import core_steps # noqa: F401 +from minifi_test_framework.steps import flow_building_steps # noqa: F401 +from minifi_test_framework.core.minifi_test_context import DEFAULT_MINIFI_CONTAINER_NAME, MinifiTestContext +from minifi_test_framework.minifi.processor import Processor +from minifi_test_framework.core.helpers import wait_for_condition + +from mqtt_broker_container import MqttBrokerContainer + + +@given("a {processor_name} processor set up to communicate with an MQTT broker instance in the \"{container_name}\" flow") +def step_impl(context: MinifiTestContext, processor_name: str, container_name: str): + processor = Processor(processor_name, processor_name) + processor.add_property('Broker URI', f'mqtt-broker-{context.scenario_id}:1883') + processor.add_property('Topic', 'testtopic') + if processor_name == 'PublishMQTT': + processor.add_property('Client ID', 'publisher-client') + elif processor_name == 'ConsumeMQTT': + processor.add_property('Client ID', 'consumer-client') + else: + raise ValueError(f"Unknown processor to communicate with MQTT broker: {processor_name}") + + context.get_or_create_minifi_container(container_name).flow_definition.add_processor(processor) + + +@given("a {processor_name} processor set up to communicate with an MQTT broker instance") +def step_impl(context: MinifiTestContext, processor_name: str): + context.execute_steps(f'given a {processor_name} processor set up to communicate with an MQTT broker instance in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow') + + +@step("an MQTT broker is started") +def step_impl(context: MinifiTestContext): + context.containers["mqtt-broker"] = MqttBrokerContainer(context) + assert context.containers["mqtt-broker"].deploy() + + +@then('the MQTT broker has a log line matching "{log_line_regex}"') +def step_impl(context: MinifiTestContext, log_line_regex: str): + assert wait_for_condition( + condition=lambda: re.search(log_line_regex, context.containers["mqtt-broker"].get_logs()), + timeout_seconds=60, + bail_condition=lambda: context.containers["mqtt-broker"].exited, + context=None) + + +@then('the MQTT broker has {log_count:d} log lines matching "{log_line_regex}"') +def step_impl(context: MinifiTestContext, log_count: int, log_line_regex: str): + assert wait_for_condition( + condition=lambda: len(re.findall(log_line_regex, context.containers["mqtt-broker"].get_logs())) == log_count, + timeout_seconds=60, + bail_condition=lambda: context.containers["mqtt-broker"].exited, + context=None) + + +@when("a test message \"{message}\" is published to the MQTT broker on topic \"{topic}\"") +def step_impl(context: MinifiTestContext, message: str, topic: str): + assert context.containers["mqtt-broker"].publish_mqtt_message(topic, message)