Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions behave_framework/src/minifi_test_framework/containers/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import shlex
import tempfile
import tarfile
import uuid

import docker
from docker.models.networks import Network
Expand Down Expand Up @@ -50,6 +51,23 @@ def __init__(self, image_name: str, container_name: str, network: Network, comma
def add_host_file(self, host_path: str, container_path: str, mode: str = "ro"):
self.host_files.append(HostFile(container_path, host_path, mode))

def add_file_to_running_container(self, content: str, path: str):
if not self.container:
logging.error("Container is not running. Cannot add file.")
raise RuntimeError("Container is not running. Cannot add file.")

mkdir_command = f"mkdir -p {shlex.quote(path)}"
exit_code, output = self.exec_run(mkdir_command)
if exit_code != 0:
logging.error(f"Error creating directory '{path}' in container: {output}")
raise RuntimeError(f"Error creating directory '{path}' in container: {output}")

file_name = str(uuid.uuid4())
exit_code, output = self.exec_run(f"sh -c \"printf %s '{content}' > {os.path.join(path, file_name)}\"")
if exit_code != 0:
logging.error(f"Error adding file to running container: {output}")
raise RuntimeError(f"Error adding file to running container: {output}")

def _write_content_to_file(self, filepath: str, permissions: int | None, content: str | bytes):
write_mode = "w"
if isinstance(content, bytes):
Expand All @@ -74,7 +92,13 @@ def _configure_volumes_of_container_dirs(self):
self._write_content_to_file(file_path, None, content)
self.volumes[temp_path] = {"bind": directory.path, "mode": directory.mode}

def is_deployed(self) -> bool:
return self.container is not None

def deploy(self) -> bool:
if self.is_deployed():
logging.info(f"Container '{self.container_name}' is already deployed.")
return True
self._temp_dir = tempfile.TemporaryDirectory()
self._configure_volumes_of_container_files()
self._configure_volumes_of_container_dirs()
Expand Down
14 changes: 10 additions & 4 deletions behave_framework/src/minifi_test_framework/steps/checking_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,18 @@ def step_impl(context: MinifiTestContext, content: str, path: str, duration: str
timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_default_minifi_container().exited, context=context)


@then('a single file with the content "{content}" is placed in the "{directory}" directory in less than {duration}')
def step_impl(context: MinifiTestContext, content: str, directory: str, duration: str):
@then('in the "{container_name}" container a single file with the content "{content}" is placed in the "{directory}" directory in less than {duration}')
def step_impl(context: MinifiTestContext, container_name: str, content: str, directory: str, duration: str):
new_content = content.replace("\\n", "\n")
timeout_in_seconds = humanfriendly.parse_timespan(duration)
assert wait_for_condition(
condition=lambda: context.get_default_minifi_container().directory_has_single_file_with_content(directory, new_content),
timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_default_minifi_container().exited, context=context)
condition=lambda: context.get_minifi_container(container_name).directory_has_single_file_with_content(directory, new_content),
timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_minifi_container(container_name).exited, context=context)


@then('a single file with the content "{content}" is placed in the "{directory}" directory in less than {duration}')
def step_impl(context: MinifiTestContext, content: str, directory: str, duration: str):
context.execute_steps(f'then in the "{DEFAULT_MINIFI_CONTAINER_NAME}" container a single file with the content "{content}" is placed in the "{directory}" directory in less than {duration}')


@then('in the "{container_name}" container at least one file with the content "{content}" is placed in the "{directory}" directory in less than {duration}')
Expand Down Expand Up @@ -147,6 +152,7 @@ def step_impl(context: MinifiTestContext, directory: str, regex_str: str, durati


@then('files with contents "{content_one}" and "{content_two}" are placed in the "{directory}" directory in less than {timeout}')
@then("files with contents '{content_one}' and '{content_two}' are placed in the '{directory}' directory in less than {timeout}")
def step_impl(context: MinifiTestContext, directory: str, timeout: str, content_one: str, content_two: str):
timeout_seconds = humanfriendly.parse_timespan(timeout)
c1 = content_one.replace("\\n", "\n")
Expand Down
46 changes: 44 additions & 2 deletions behave_framework/src/minifi_test_framework/steps/core_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,27 @@ def step_impl(context: MinifiTestContext, file_name: str, content: str, path: st
context.get_or_create_default_minifi_container().files.append(File(os.path.join(path, file_name), new_content))


@step('a file with the content "{content}" is present in "{path}"')
@given('a file with the content "{content}" is present in "{path}" in the "{container_name}" flow')
def step_impl(context: MinifiTestContext, content: str, path: str, container_name: str):
new_content = content.replace("\\n", "\n")
context.get_or_create_minifi_container(container_name).files.append(File(os.path.join(path, str(uuid.uuid4())), new_content))


@given('a file with the content "{content}" is present in "{path}"')
@given("a file with the content '{content}' is present in '{path}'")
def step_impl(context: MinifiTestContext, content: str, path: str):
context.execute_steps(f"given a file with the content \"{content}\" is present in \"{path}\" in the \"{DEFAULT_MINIFI_CONTAINER_NAME}\" flow")


@when('a file with the content "{content}" is placed in "{path}" in the "{container_name}" flow')
def step_impl(context: MinifiTestContext, content: str, path: str, container_name: str):
new_content = content.replace("\\n", "\n")
context.get_or_create_default_minifi_container().files.append(File(os.path.join(path, str(uuid.uuid4())), new_content))
context.get_minifi_container(container_name).add_file_to_running_container(new_content, path)


@when('a file with the content "{content}" is placed in "{path}"')
def step_impl(context: MinifiTestContext, content: str, path: str):
context.execute_steps(f"when a file with the content \"{content}\" is placed in \"{path}\" in the \"{DEFAULT_MINIFI_CONTAINER_NAME}\" flow")


@given("an empty file is present in \"{path}\"")
Expand Down Expand Up @@ -95,11 +112,31 @@ def step_impl(context: MinifiTestContext):
context.get_or_create_default_minifi_container().stop()


@when("\"{container_name}\" flow is stopped")
def step_impl(context: MinifiTestContext, container_name: str):
context.get_or_create_minifi_container(container_name).stop()


@when("MiNiFi is restarted")
def step_impl(context: MinifiTestContext):
context.get_or_create_default_minifi_container().restart()


@when("\"{container_name}\" flow is restarted")
def step_impl(context: MinifiTestContext, container_name: str):
context.get_or_create_minifi_container(container_name).restart()


@when("\"{container_name}\" flow is started")
def step_impl(context: MinifiTestContext, container_name: str):
context.get_or_create_minifi_container(container_name).start()


@when("\"{container_name}\" flow is killed")
def step_impl(context: MinifiTestContext, container_name: str):
context.get_or_create_minifi_container(container_name).kill()


@given("OpenSSL FIPS mode is enabled in MiNiFi")
def step_impl(context: MinifiTestContext):
context.get_or_create_default_minifi_container().enable_openssl_fips_mode()
Expand Down Expand Up @@ -145,3 +182,8 @@ def step_impl(context: MinifiTestContext):
def step_impl(context: MinifiTestContext):
context.containers["minifi-c2-server"] = MinifiC2Server(context)
assert context.containers["minifi-c2-server"].deploy()


@step("{duration} later")
def step_impl(context: MinifiTestContext, duration: str):
time.sleep(humanfriendly.parse_timespan(duration))
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,16 @@ def step_impl(context: MinifiTestContext, processor_type: str, property_name: st
context.get_or_create_minifi_container(minifi_container_name).flow_definition.add_processor(processor)


@given('a {processor_type} processor with the name "{processor_name}" in the "{minifi_container_name}" flow')
def step_impl(context: MinifiTestContext, processor_type: str, processor_name: str, minifi_container_name: str):
processor = Processor(processor_type, processor_name)
context.get_or_create_minifi_container(minifi_container_name).flow_definition.add_processor(processor)


@given('a {processor_type} processor with the name "{processor_name}"')
def step_impl(context: MinifiTestContext, processor_type: str, processor_name: str):
processor = Processor(processor_type, processor_name)
context.get_or_create_default_minifi_container().flow_definition.add_processor(processor)
context.execute_steps(
f'given a {processor_type} processor with the name "{processor_name}" in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow')


@given("a {processor_type} processor in the \"{minifi_container_name}\" flow")
Expand Down
3 changes: 2 additions & 1 deletion docker/RunBehaveTests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,5 @@ exec \
"${docker_dir}/../extensions/gcp/tests/features" \
"${docker_dir}/../extensions/grafana-loki/tests/features" \
"${docker_dir}/../extensions/lua/tests/features/" \
"${docker_dir}/../extensions/civetweb/tests/features/"
"${docker_dir}/../extensions/civetweb/tests/features/" \
"${docker_dir}/../extensions/mqtt/tests/features/"
1 change: 0 additions & 1 deletion docker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,3 @@ azure-storage-blob==12.24.1
prometheus-api-client==0.5.5
humanfriendly==10.0
requests<2.29 # https://github.com/docker/docker-py/issues/3113
paho-mqtt==2.1.0
9 changes: 0 additions & 9 deletions docker/test/integration/cluster/ContainerStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from .containers.AzureStorageServerContainer import AzureStorageServerContainer
from .containers.HttpProxyContainer import HttpProxyContainer
from .containers.PostgreSQLServerContainer import PostgreSQLServerContainer
from .containers.MqttBrokerContainer import MqttBrokerContainer
from .containers.SyslogUdpClientContainer import SyslogUdpClientContainer
from .containers.SyslogTcpClientContainer import SyslogTcpClientContainer
from .containers.MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster
Expand Down Expand Up @@ -128,14 +127,6 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c
network=self.network,
image_store=self.image_store,
command=command))
elif engine == 'mqtt-broker':
return self.containers.setdefault(container_name,
MqttBrokerContainer(feature_context=feature_context,
name=container_name,
vols=self.vols,
network=self.network,
image_store=self.image_store,
command=command))
elif engine == "syslog-udp-client":
return self.containers.setdefault(container_name,
SyslogUdpClientContainer(
Expand Down
5 changes: 0 additions & 5 deletions docker/test/integration/cluster/DockerTestCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from .checkers.PostgresChecker import PostgresChecker
from .checkers.PrometheusChecker import PrometheusChecker
from .checkers.ModbusChecker import ModbusChecker
from .checkers.MqttHelper import MqttHelper
from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage


Expand All @@ -37,7 +36,6 @@ def __init__(self, context, feature_id):
self.postgres_checker = PostgresChecker(self.container_communicator)
self.prometheus_checker = PrometheusChecker()
self.modbus_checker = ModbusChecker(self.container_communicator)
self.mqtt_helper = MqttHelper()

def cleanup(self):
self.container_store.cleanup()
Expand Down Expand Up @@ -278,6 +276,3 @@ def set_value_on_plc_with_modbus(self, container_name, modbus_cmd):

def enable_ssl_in_nifi(self):
self.container_store.enable_ssl_in_nifi()

def publish_test_mqtt_message(self, topic: str, message: str):
self.mqtt_helper.publish_test_mqtt_message(topic, message)
11 changes: 0 additions & 11 deletions docker/test/integration/cluster/ImageStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ def get_image(self, container_engine):
image = self.__build_http_proxy_image()
elif container_engine == "postgresql-server":
image = self.__build_postgresql_server_image()
elif container_engine == "mqtt-broker":
image = self.__build_mqtt_broker_image()
else:
raise Exception("There is no associated image for " + container_engine)

Expand Down Expand Up @@ -277,15 +275,6 @@ def __build_postgresql_server_image(self):
""".format(base_image='postgres:17.4'))
return self.__build_image(dockerfile)

def __build_mqtt_broker_image(self):
dockerfile = dedent("""\
FROM {base_image}
RUN echo 'log_dest stderr' >> /mosquitto-no-auth.conf
CMD ["/usr/sbin/mosquitto", "--verbose", "--config-file", "/mosquitto-no-auth.conf"]
""".format(base_image='eclipse-mosquitto:2.0.14'))

return self.__build_image(dockerfile)

def __build_image(self, dockerfile, context_files=[]):
conf_dockerfile_buffer = BytesIO()
docker_context_buffer = BytesIO()
Expand Down
40 changes: 0 additions & 40 deletions docker/test/integration/cluster/containers/MqttBrokerContainer.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,3 @@ def set_value_on_plc_with_modbus(self, container_name, modbus_cmd):

def enable_ssl_in_nifi(self):
self.cluster.enable_ssl_in_nifi()

def publish_test_mqtt_message(self, topic, message):
self.cluster.publish_test_mqtt_message(topic, message)
27 changes: 0 additions & 27 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def step_impl(context, processor_type, minifi_container_name):

@given("a {processor_type} processor")
@given("a {processor_type} processor set up to communicate with an Azure blob storage")
@given("a {processor_type} processor set up to communicate with an MQTT broker instance")
def step_impl(context, processor_type):
__create_processor(context, processor_type, processor_type, None, None, "minifi-cpp-flow")

Expand Down Expand Up @@ -486,16 +485,6 @@ def step_impl(context, processor_name, service_property_name, property_name, pro
__set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, {property_name: property_value})


# MQTT setup
@when("an MQTT broker is set up in correspondence with the PublishMQTT")
@given("an MQTT broker is set up in correspondence with the PublishMQTT")
@given("an MQTT broker is set up in correspondence with the ConsumeMQTT")
@given("an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT")
def step_impl(context):
context.test.acquire_container(context=context, name="mqtt-broker", engine="mqtt-broker")
context.test.start('mqtt-broker')


# azure storage setup
@given("an Azure storage server is set up")
def step_impl(context):
Expand Down Expand Up @@ -779,22 +768,6 @@ def step_impl(context, regex, duration):
context.test.check_minifi_log_matches_regex(regex, humanfriendly.parse_timespan(duration))


# MQTT
@then("the MQTT broker has a log line matching \"{log_pattern}\"")
def step_impl(context, log_pattern):
context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 60, count=1)


@then("the MQTT broker has {log_count} log lines matching \"{log_pattern}\"")
def step_impl(context, log_count, log_pattern):
context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 60, count=int(log_count))


@when("a test message \"{message}\" is published to the MQTT broker on topic \"{topic}\"")
def step_impl(context, message, topic):
context.test.publish_test_mqtt_message(topic, message)


@then("the \"{minifi_container_name}\" flow has a log line matching \"{log_pattern}\" in less than {duration}")
def step_impl(context, minifi_container_name, log_pattern, duration):
context.test.check_container_log_matches_regex(minifi_container_name, log_pattern, humanfriendly.parse_timespan(duration), count=1)
Expand Down
30 changes: 0 additions & 30 deletions docker/test/integration/minifi/processors/ConsumeMQTT.py

This file was deleted.

Loading
Loading