From 23612848259b7d0de6847925c2f71463fcab8bd4 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 13 Jan 2026 19:53:28 +0100 Subject: [PATCH 1/6] initial opa integration test (no opa tls) --- tests/release.yaml | 2 + tests/templates/kuttl/opa/00-patch-ns.yaml.j2 | 9 +++ tests/templates/kuttl/opa/01-assert.yaml.j2 | 10 +++ ...vector-aggregator-discovery-configmap.yaml | 17 ++++ .../kuttl/opa/09-install-secretclass.yaml.j2 | 22 +++++ tests/templates/kuttl/opa/10-assert.yaml.j2 | 12 +++ .../templates/kuttl/opa/10-install-zk.yaml.j2 | 18 +++++ tests/templates/kuttl/opa/20-assert.yaml.j2 | 8 ++ .../kuttl/opa/20-install-opa.yaml.j2 | 56 +++++++++++++ tests/templates/kuttl/opa/25-assert.yaml | 8 ++ .../25-install-test-scripts-configmap.yaml | 8 ++ .../templates/kuttl/opa/25_test_client_tls.sh | 59 ++++++++++++++ tests/templates/kuttl/opa/30-assert.yaml.j2 | 32 ++++++++ .../kuttl/opa/30-install-kafka.yaml.j2 | 46 +++++++++++ tests/templates/kuttl/opa/40-assert.yaml.j2 | 5 ++ .../opa/vector-aggregator-values.yaml.j2 | 80 +++++++++++++++++++ tests/test-definition.yaml | 14 ++++ 17 files changed, 406 insertions(+) create mode 100644 tests/templates/kuttl/opa/00-patch-ns.yaml.j2 create mode 100644 tests/templates/kuttl/opa/01-assert.yaml.j2 create mode 100644 tests/templates/kuttl/opa/01-install-vector-aggregator-discovery-configmap.yaml create mode 100644 tests/templates/kuttl/opa/09-install-secretclass.yaml.j2 create mode 100644 tests/templates/kuttl/opa/10-assert.yaml.j2 create mode 100644 tests/templates/kuttl/opa/10-install-zk.yaml.j2 create mode 100644 tests/templates/kuttl/opa/20-assert.yaml.j2 create mode 100644 tests/templates/kuttl/opa/20-install-opa.yaml.j2 create mode 100644 tests/templates/kuttl/opa/25-assert.yaml create mode 100644 tests/templates/kuttl/opa/25-install-test-scripts-configmap.yaml create mode 100755 tests/templates/kuttl/opa/25_test_client_tls.sh create mode 100644 tests/templates/kuttl/opa/30-assert.yaml.j2 create mode 100644 tests/templates/kuttl/opa/30-install-kafka.yaml.j2 create mode 100644 tests/templates/kuttl/opa/40-assert.yaml.j2 create mode 100644 tests/templates/kuttl/opa/vector-aggregator-values.yaml.j2 diff --git a/tests/release.yaml b/tests/release.yaml index fa692d24..981efdf3 100644 --- a/tests/release.yaml +++ b/tests/release.yaml @@ -16,3 +16,5 @@ releases: operatorVersion: 0.0.0-dev kafka: operatorVersion: 0.0.0-dev + opa: + operatorVersion: 0.0.0-dev diff --git a/tests/templates/kuttl/opa/00-patch-ns.yaml.j2 b/tests/templates/kuttl/opa/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/opa/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/opa/01-assert.yaml.j2 b/tests/templates/kuttl/opa/01-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/opa/01-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/opa/01-install-vector-aggregator-discovery-configmap.yaml b/tests/templates/kuttl/opa/01-install-vector-aggregator-discovery-configmap.yaml new file mode 100644 index 00000000..197db165 --- /dev/null +++ b/tests/templates/kuttl/opa/01-install-vector-aggregator-discovery-configmap.yaml @@ -0,0 +1,17 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install vector-aggregator vector + --namespace $NAMESPACE + --version 0.45.0 + --repo https://helm.vector.dev + --values vector-aggregator-values.yaml +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: vector-aggregator:6123 diff --git a/tests/templates/kuttl/opa/09-install-secretclass.yaml.j2 b/tests/templates/kuttl/opa/09-install-secretclass.yaml.j2 new file mode 100644 index 00000000..8a09c433 --- /dev/null +++ b/tests/templates/kuttl/opa/09-install-secretclass.yaml.j2 @@ -0,0 +1,22 @@ +{% if test_scenario['values']['use-opa-tls'] == "true" %} +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: | + kubectl apply -n $NAMESPACE -f - << EOF + --- + apiVersion: secrets.stackable.tech/v1alpha2 + kind: SecretClass + metadata: + name: opa-tls-$NAMESPACE + spec: + backend: + autoTls: + ca: + autoGenerate: true + secret: + name: opa-tls-ca + namespace: $NAMESPACE + EOF +{% endif %} diff --git a/tests/templates/kuttl/opa/10-assert.yaml.j2 b/tests/templates/kuttl/opa/10-assert.yaml.j2 new file mode 100644 index 00000000..c9cfcf5c --- /dev/null +++ b/tests/templates/kuttl/opa/10-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-zk-server-default +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/opa/10-install-zk.yaml.j2 b/tests/templates/kuttl/opa/10-install-zk.yaml.j2 new file mode 100644 index 00000000..7e6192e3 --- /dev/null +++ b/tests/templates/kuttl/opa/10-install-zk.yaml.j2 @@ -0,0 +1,18 @@ +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperCluster +metadata: + name: test-zk +spec: + image: + productVersion: "{{ test_scenario['values']['zookeeper-latest'] }}" + pullPolicy: IfNotPresent + clusterConfig: + vectorAggregatorConfigMapName: vector-aggregator-discovery + servers: + config: + logging: + enableVectorAgent: true + roleGroups: + default: + replicas: 1 diff --git a/tests/templates/kuttl/opa/20-assert.yaml.j2 b/tests/templates/kuttl/opa/20-assert.yaml.j2 new file mode 100644 index 00000000..819abc51 --- /dev/null +++ b/tests/templates/kuttl/opa/20-assert.yaml.j2 @@ -0,0 +1,8 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: install-opa +timeout: 300 +commands: + - script: kubectl -n $NAMESPACE wait --for=condition=available opaclusters.opa.stackable.tech/test-opa --timeout 301s diff --git a/tests/templates/kuttl/opa/20-install-opa.yaml.j2 b/tests/templates/kuttl/opa/20-install-opa.yaml.j2 new file mode 100644 index 00000000..b26303fb --- /dev/null +++ b/tests/templates/kuttl/opa/20-install-opa.yaml.j2 @@ -0,0 +1,56 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: | + kubectl apply -n $NAMESPACE -f - < 0 %} + custom: "{{ test_scenario['values']['opa-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['opa-latest'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['opa-latest'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: +{% if test_scenario['values']['use-opa-tls'] == "true" %} + tls: + serverSecretClass: opa-tls-$NAMESPACE +{% endif %} + vectorAggregatorConfigMapName: vector-aggregator-discovery + servers: + config: + logging: + enableVectorAgent: true + containers: + opa: + console: + level: INFO + file: + level: INFO + loggers: + decision: + level: INFO + roleGroups: + default: {} + EOF diff --git a/tests/templates/kuttl/opa/25-assert.yaml b/tests/templates/kuttl/opa/25-assert.yaml new file mode 100644 index 00000000..828b4be9 --- /dev/null +++ b/tests/templates/kuttl/opa/25-assert.yaml @@ -0,0 +1,8 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-scripts diff --git a/tests/templates/kuttl/opa/25-install-test-scripts-configmap.yaml b/tests/templates/kuttl/opa/25-install-test-scripts-configmap.yaml new file mode 100644 index 00000000..42c57426 --- /dev/null +++ b/tests/templates/kuttl/opa/25-install-test-scripts-configmap.yaml @@ -0,0 +1,8 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: | + kubectl create configmap test-scripts \ + --namespace $NAMESPACE \ + --from-file=test_client_tls.sh=25_test_client_tls.sh diff --git a/tests/templates/kuttl/opa/25_test_client_tls.sh b/tests/templates/kuttl/opa/25_test_client_tls.sh new file mode 100755 index 00000000..c317d66d --- /dev/null +++ b/tests/templates/kuttl/opa/25_test_client_tls.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +# Usage: test_client_tls.sh namespace + +# to be safe +unset TOPIC +unset BAD_TOPIC + +KAFKA="$(cat /stackable/listener-broker/default-address/address):$(cat /stackable/listener-broker/default-address/ports/kafka-tls)" + +echo "Connecting to bootstrap address $KAFKA" + +echo "Start client TLS testing..." +############################################################################ +# Test the secured connection +############################################################################ +# create random topics +TOPIC=$(tr -dc A-Za-z0-9 /dev/null +then + echo "[ERROR] Secure client topic created without certificates!" + exit 1 +else + echo "[SUCCESS] Secure client topic creation failed without certificates!" +fi + +############################################################################ +# Test the connection with bad host name +############################################################################ +if /stackable/kafka/bin/kafka-topics.sh --create --topic "$BAD_TOPIC" --bootstrap-server localhost:9093 --command-config /stackable/config/client.properties &> /dev/null +then + echo "[ERROR] Secure client topic created with bad host name!" + exit 1 +else + echo "[SUCCESS] Secure client topic creation failed with bad host name!" +fi + +echo "All client TLS tests successful!" +exit 0 diff --git a/tests/templates/kuttl/opa/30-assert.yaml.j2 b/tests/templates/kuttl/opa/30-assert.yaml.j2 new file mode 100644 index 00000000..50639b33 --- /dev/null +++ b/tests/templates/kuttl/opa/30-assert.yaml.j2 @@ -0,0 +1,32 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-kafka-broker-default +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: log-dirs-test-kafka-broker-default-0 +spec: + resources: + requests: + storage: 2Gi +status: + phase: Bound +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: test-kafka-broker +status: + expectedPods: 1 + currentHealthy: 1 + disruptionsAllowed: 1 diff --git a/tests/templates/kuttl/opa/30-install-kafka.yaml.j2 b/tests/templates/kuttl/opa/30-install-kafka.yaml.j2 new file mode 100644 index 00000000..c4257762 --- /dev/null +++ b/tests/templates/kuttl/opa/30-install-kafka.yaml.j2 @@ -0,0 +1,46 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +timeout: 300 +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: test-kafka +spec: + image: +{% if test_scenario['values']['kafka-latest'].find(",") > 0 %} + custom: "{{ test_scenario['values']['kafka-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['kafka-latest'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['kafka-latest'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: + authorization: + opa: + configMapName: test-opa + package: authz + tls: + serverSecretClass: tls + vectorAggregatorConfigMapName: vector-aggregator-discovery + zookeeperConfigMapName: test-zk + brokers: + config: + logging: + enableVectorAgent: true + #requestedSecretLifetime: 7d + roleGroups: + default: + replicas: 1 + podOverrides: + spec: + volumes: + - name: test-scripts + configMap: + name: test-scripts + containers: + - name: kafka + volumeMounts: + - mountPath: /test-scripts + name: test-scripts diff --git a/tests/templates/kuttl/opa/40-assert.yaml.j2 b/tests/templates/kuttl/opa/40-assert.yaml.j2 new file mode 100644 index 00000000..b247c28a --- /dev/null +++ b/tests/templates/kuttl/opa/40-assert.yaml.j2 @@ -0,0 +1,5 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +commands: + - script: | + kubectl exec -n $NAMESPACE test-kafka-broker-default-0 -c kafka -- bash /test-scripts/test_client_tls.sh diff --git a/tests/templates/kuttl/opa/vector-aggregator-values.yaml.j2 b/tests/templates/kuttl/opa/vector-aggregator-values.yaml.j2 new file mode 100644 index 00000000..4bb9fdc4 --- /dev/null +++ b/tests/templates/kuttl/opa/vector-aggregator-values.yaml.j2 @@ -0,0 +1,80 @@ +--- +role: Aggregator +service: + ports: + - name: api + port: 8686 + protocol: TCP + targetPort: 8686 + - name: vector + port: 6123 + protocol: TCP + targetPort: 6000 +customConfig: + api: + address: 0.0.0.0:8686 + enabled: true + sources: + vector: + address: 0.0.0.0:6000 + type: vector + version: "2" + transforms: + validEvents: + type: filter + inputs: [vector] + condition: is_null(.errors) + filteredAutomaticLogConfigServerOpaDecision: + type: filter + inputs: [validEvents] + condition: >- + starts_with(string!(.pod), "test-opa-server-automatic-log-config") && + .container == "opa" && + .logger == "decision" + filteredAutomaticLogConfigServerOpa: + type: filter + inputs: [validEvents] + condition: >- + starts_with(string!(.pod), "test-opa-server-automatic-log-config") && + .container == "opa" && + .logger != "decision" + filteredAutomaticLogConfigServerBundleBuilder: + type: filter + inputs: [validEvents] + condition: >- + starts_with(string!(.pod), "test-opa-server-automatic-log-config") && + .container == "bundle-builder" + filteredAutomaticLogConfigServerVector: + type: filter + inputs: [validEvents] + condition: >- + starts_with(string!(.pod), "test-opa-server-automatic-log-config") && + .container == "vector" + filteredAutomaticLogConfigServerPrepare: + type: filter + inputs: [validEvents] + condition: >- + starts_with(string!(.pod), "test-opa-server-automatic-log-config") && + .container == "prepare" + filteredInvalidEvents: + type: filter + inputs: [vector] + condition: |- + .timestamp == from_unix_timestamp!(0) || + is_null(.level) || + is_null(.logger) || + is_null(.message) + sinks: + test: + inputs: [filtered*] + type: blackhole +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + aggregator: + inputs: [vector] + type: vector + address: {{ lookup('env', 'VECTOR_AGGREGATOR') }} + buffer: + # Avoid back pressure from VECTOR_AGGREGATOR. The test should + # not fail if the aggregator is not available. + when_full: drop_newest +{% endif %} diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index f31f68a2..4e237750 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -29,6 +29,9 @@ dimensions: - name: zookeeper-latest values: - 3.9.4 + - name: opa-latest + values: + - 1.8.0 - name: upgrade_old values: - 3.7.2 @@ -43,6 +46,10 @@ dimensions: values: - "true" - "false" + - name: use-opa-tls + values: + # - "true" + - "false" - name: openshift values: - "false" @@ -82,6 +89,13 @@ tests: - zookeeper - use-client-tls - openshift + - name: opa + dimensions: + - kafka-latest + - zookeeper-latest + - openshift + - use-opa-tls + - opa-latest - name: configuration dimensions: - kafka-latest From 6548596bf0dbf919221be1a207f8ed0fe42f1386 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 15 Jan 2026 16:15:18 +0100 Subject: [PATCH 2/6] add support for communicating with opa via tls --- rust/operator-binary/src/config/command.rs | 3 + rust/operator-binary/src/crd/authorization.rs | 68 ++++++++++++++++++- rust/operator-binary/src/crd/listener.rs | 4 ++ rust/operator-binary/src/crd/security.rs | 63 +++++++++++++++++ rust/operator-binary/src/kafka_controller.rs | 49 +++++++------ .../kuttl/opa/20-install-opa.yaml.j2 | 17 +++++ tests/test-definition.yaml | 2 +- 7 files changed, 182 insertions(+), 24 deletions(-) diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index b2f31e8a..252d28c0 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -31,6 +31,8 @@ pub fn broker_kafka_container_commands( containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop & {set_realm_env} + {import_opa_tls_cert} + {broker_start_command} wait_for_termination $! @@ -42,6 +44,7 @@ pub fn broker_kafka_container_commands( true => format!("export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {STACKABLE_KERBEROS_KRB5_PATH})"), false => "".to_string(), }, + import_opa_tls_cert = kafka_security.copy_opa_tls_cert_command(), broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, product_version), } } diff --git a/rust/operator-binary/src/crd/authorization.rs b/rust/operator-binary/src/crd/authorization.rs index fef0cd8a..d5391304 100644 --- a/rust/operator-binary/src/crd/authorization.rs +++ b/rust/operator-binary/src/crd/authorization.rs @@ -1,12 +1,78 @@ use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ - commons::opa::OpaConfig, + client::Client, + commons::opa::{OpaApiVersion, OpaConfig}, + k8s_openapi::api::core::v1::ConfigMap, schemars::{self, JsonSchema}, }; +use crate::crd::v1alpha1; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("Failed to fetch OPA ConfigMap {configmap_name}"))] + FetchOpaConfigMap { + source: stackable_operator::client::Error, + configmap_name: String, + namespace: String, + }, + + #[snafu(display("invalid OpaConfig"))] + InvalidOpaConfig { + source: stackable_operator::commons::opa::Error, + }, + + #[snafu(display("object defines no namespace"))] + ObjectHasNoNamespace, +} + #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct KafkaAuthorization { // no doc - docs in the OpaConfig struct. pub opa: Option, } + +pub struct KafkaAuthorizationConfig { + pub opa_connect: String, + pub secret_class: Option, +} + +impl KafkaAuthorization { + pub async fn get_opa_config( + self, + client: &Client, + kafka: &v1alpha1::KafkaCluster, + ) -> Result, Error> { + let auth_config = if let Some(opa) = self.opa { + let namespace = kafka + .metadata + .namespace + .as_deref() + .context(ObjectHasNoNamespaceSnafu)?; + // Resolve the secret class from the ConfigMap + let secret_class = client + .get::(&opa.config_map_name, namespace) + .await + .with_context(|_| FetchOpaConfigMapSnafu { + configmap_name: &opa.config_map_name, + namespace, + })? + .data + .and_then(|mut data| data.remove("OPA_SECRET_CLASS")); + let opa_connect = opa + .full_document_url_from_config_map(client, kafka, Some("allow"), OpaApiVersion::V1) + .await + .context(InvalidOpaConfigSnafu)?; + Some(KafkaAuthorizationConfig { + opa_connect, + secret_class, + }) + } else { + None + }; + + Ok(auth_config) + } +} diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 97b15b85..1f0b8c56 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -403,6 +403,7 @@ mod tests { }]), "internalTls".to_string(), Some("tls".to_string()), + None, ); let cluster_info = default_cluster_info(); // "simple-kafka-broker-default" @@ -462,6 +463,7 @@ mod tests { ResolvedAuthenticationClasses::new(vec![]), "tls".to_string(), Some("tls".to_string()), + None, ); let config = get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info) @@ -518,6 +520,7 @@ mod tests { ResolvedAuthenticationClasses::new(vec![]), "".to_string(), None, + None, ); let config = @@ -605,6 +608,7 @@ mod tests { }]), "tls".to_string(), Some("tls".to_string()), + None, ); let cluster_info = default_cluster_info(); // "simple-kafka-broker-default" diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index b729386a..3aac5bae 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -52,6 +52,11 @@ pub enum Error { #[snafu(display("kerberos enablement requires TLS activation"))] KerberosRequiresTls, + + #[snafu(display("failed to build OPA TLS certificate volume"))] + OpaTlsCertSecretClassVolumeBuild { + source: stackable_operator::builder::pod::volume::SecretOperatorVolumeSourceBuilderError, + }, } /// Helper struct combining TLS settings for server and internal with the resolved AuthenticationClasses @@ -59,6 +64,7 @@ pub struct KafkaTlsSecurity { resolved_authentication_classes: ResolvedAuthenticationClasses, internal_secret_class: String, server_secret_class: Option, + opa_secret_class: Option, } impl KafkaTlsSecurity { @@ -75,6 +81,9 @@ impl KafkaTlsSecurity { pub const INTERNAL_PORT: u16 = 19092; // - TLS internal const INTER_BROKER_LISTENER_NAME: &'static str = "inter.broker.listener.name"; + const OPA_TLS_MOUNT_PATH: &str = "/stackable/tls-opa"; + // opa + const OPA_TLS_VOLUME_NAME: &str = "tls-opa"; pub const SECURE_BOOTSTRAP_PORT: u16 = 9095; pub const SECURE_CLIENT_PORT: u16 = 9093; pub const SECURE_CLIENT_PORT_NAME: &'static str = "kafka-tls"; @@ -94,11 +103,13 @@ impl KafkaTlsSecurity { resolved_authentication_classes: ResolvedAuthenticationClasses, internal_secret_class: String, server_secret_class: Option, + opa_secret_class: Option, ) -> Self { Self { resolved_authentication_classes, internal_secret_class, server_secret_class, + opa_secret_class, } } @@ -107,6 +118,7 @@ impl KafkaTlsSecurity { pub async fn new_from_kafka_cluster( client: &Client, kafka: &v1alpha1::KafkaCluster, + opa_secret_class: Option, ) -> Result { Ok(KafkaTlsSecurity { resolved_authentication_classes: ResolvedAuthenticationClasses::from_references( @@ -128,6 +140,7 @@ impl KafkaTlsSecurity { .tls .as_ref() .and_then(|tls| tls.server_secret_class.clone()), + opa_secret_class, }) } @@ -166,6 +179,22 @@ impl KafkaTlsSecurity { self.kerberos_secret_class().is_some() } + fn has_opa_tls_enabled(&self) -> bool { + self.opa_secret_class.is_some() + } + + pub fn copy_opa_tls_cert_command(&self) -> String { + match self.has_opa_tls_enabled() { + true => format!( + "keytool -importcert -file {opa_mount_path}/ca.crt -keystore {tls_dir}/truststore.p12 -storepass '{tls_password}' -alias opa-ca -noprompt", + opa_mount_path = Self::OPA_TLS_MOUNT_PATH, + tls_dir = Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR, + tls_password = Self::SSL_STORE_PASSWORD, + ), + false => "".to_string(), + } + } + pub fn kerberos_secret_class(&self) -> Option { if let Some(kerberos) = self .resolved_authentication_classes @@ -486,6 +515,24 @@ impl KafkaTlsSecurity { .context(AddVolumeMountSnafu)?; } + if let Some(secret_class) = &self.opa_secret_class { + cb_kafka + .add_volume_mount(Self::OPA_TLS_VOLUME_NAME, Self::OPA_TLS_MOUNT_PATH) + .context(AddVolumeMountSnafu)?; + + pod_builder + .add_volume( + VolumeBuilder::new(Self::OPA_TLS_VOLUME_NAME) + .ephemeral( + SecretOperatorVolumeSourceBuilder::new(secret_class) + .build() + .context(OpaTlsCertSecretClassVolumeBuildSnafu)?, + ) + .build(), + ) + .context(AddVolumeSnafu)?; + } + Ok(()) } @@ -664,6 +711,22 @@ impl KafkaTlsSecurity { ); } + //OPA Tls + if self.opa_secret_class.is_some() { + config.insert( + "opa.authorizer.truststore.path".to_string(), + format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), + ); + config.insert( + "opa.authorizer.truststore.password".to_string(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + } + config.insert( + "opa.authorizer.truststore.type".to_string(), + "PKCS12".to_string(), + ); + // common config.insert( Self::INTER_BROKER_LISTENER_NAME.to_string(), diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 5d5de6b5..a4dcdc21 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -8,7 +8,6 @@ use snafu::{ResultExt, Snafu}; use stackable_operator::{ cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, commons::{ - opa::OpaApiVersion, product_image_selection::{self}, rbac::build_rbac_resources, }, @@ -36,7 +35,7 @@ use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ crd::{ self, APP_NAME, DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KafkaClusterStatus, - OPERATOR_NAME, + OPERATOR_NAME, authorization, listener::get_kafka_listener_config, role::{ AnyConfig, KafkaRole, broker::BROKER_PROPERTIES_FILE, @@ -122,11 +121,6 @@ pub enum Error { source: stackable_operator::cluster_resources::Error, }, - #[snafu(display("invalid OpaConfig"))] - InvalidOpaConfig { - source: stackable_operator::commons::opa::Error, - }, - #[snafu(display("failed to delete orphaned resources"))] DeleteOrphans { source: stackable_operator::cluster_resources::Error, @@ -212,6 +206,9 @@ pub enum Error { BuildListener { source: crate::resource::listener::Error, }, + + #[snafu(display("object defines no namespace"))] + GetOpaConfig { source: authorization::Error }, } type Result = std::result::Result; @@ -231,7 +228,6 @@ impl ReconcilerError for Error { Error::InvalidProductConfig { .. } => None, Error::BuildDiscoveryConfig { .. } => None, Error::ApplyDiscoveryConfig { .. } => None, - Error::InvalidOpaConfig { .. } => None, Error::DeleteOrphans { .. } => None, Error::FailedToInitializeSecurityContext { .. } => None, Error::CreateClusterResources { .. } => None, @@ -253,6 +249,7 @@ impl ReconcilerError for Error { Error::BuildListener { .. } => None, Error::InvalidKafkaListeners { .. } => None, Error::BuildPodDescriptors { .. } => None, + Error::GetOpaConfig { .. } => None, } } } @@ -298,7 +295,28 @@ pub async fn reconcile_kafka( &ctx.product_config, )?; - let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster(client, kafka) + // Assemble the OPA connection string from the discovery and the given path if provided + // Will be passed as --override parameter in the cli in the stateful set + let opa_config = &kafka + .spec + .cluster_config + .authorization + .clone() + .get_opa_config(client, kafka) + .await + .context(GetOpaConfigSnafu)?; + + let opa_connect = opa_config + .as_ref() + .map(|auth_config| auth_config.opa_connect.clone()); + + let opa_secret_class = if let Some(opa_config) = opa_config.as_ref() { + opa_config.secret_class.to_owned() + } else { + None + }; + + let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster(client, kafka, opa_secret_class) .await .context(FailedToInitializeSecurityContextSnafu)?; @@ -314,19 +332,6 @@ pub async fn reconcile_kafka( .validate_authentication_methods() .context(FailedToValidateAuthenticationMethodSnafu)?; - // Assemble the OPA connection string from the discovery and the given path if provided - // Will be passed as --override parameter in the cli in the state ful set - let opa_connect = if let Some(opa_spec) = &kafka.spec.cluster_config.authorization.opa { - Some( - opa_spec - .full_document_url_from_config_map(client, kafka, Some("allow"), OpaApiVersion::V1) - .await - .context(InvalidOpaConfigSnafu)?, - ) - } else { - None - }; - let mut ss_cond_builder = StatefulSetConditionBuilder::default(); let (rbac_sa, rbac_rolebinding) = build_rbac_resources( diff --git a/tests/templates/kuttl/opa/20-install-opa.yaml.j2 b/tests/templates/kuttl/opa/20-install-opa.yaml.j2 index b26303fb..22a6d8d1 100644 --- a/tests/templates/kuttl/opa/20-install-opa.yaml.j2 +++ b/tests/templates/kuttl/opa/20-install-opa.yaml.j2 @@ -15,6 +15,23 @@ commands: kafka.rego: | package authz + # A very simple rule: + # If set to false this will prevent the kafka cluster from deploying. + # If set to true the topc can be created and read with e.g. the + # following in the decision log: + # "input": { + # "action": { + # ... + # "operation": "CREATE", + # ... + # } + # }, + # ... + # }, + # "msg": "Decision Log", + # "path": "authz/allow", + # "result": true, + allow if { true } diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 4e237750..45845e7d 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -48,7 +48,7 @@ dimensions: - "false" - name: use-opa-tls values: - # - "true" + - "true" - "false" - name: openshift values: From 63e9dbf8bcc5ca39c52e7918a0fe8d35dd853d37 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 16 Jan 2026 09:20:48 +0100 Subject: [PATCH 3/6] make test kerberized --- tests/templates/kuttl/opa/00-rbac.yaml.j2 | 29 ++++ .../opa/{01-assert.yaml.j2 => 01-assert.yaml} | 10 +- .../kuttl/opa/01-install-krb5-kdc.yaml.j2 | 144 ++++++++++++++++++ .../02-create-kerberos-secretclass.yaml.j2 | 37 +++++ tests/templates/kuttl/opa/25-assert.yaml | 8 - .../25-install-test-scripts-configmap.yaml | 8 - .../templates/kuttl/opa/25_test_client_tls.sh | 59 ------- tests/templates/kuttl/opa/30-assert.yaml.j2 | 20 +-- .../kuttl/opa/30-install-kafka.yaml.j2 | 88 +++++------ .../kuttl/opa/40-access-kafka.txt.j2 | 118 ++++++++++++++ .../templates/kuttl/opa/40-access-kafka.yaml | 6 + tests/templates/kuttl/opa/40-assert.yaml | 11 ++ tests/templates/kuttl/opa/40-assert.yaml.j2 | 5 - tests/test-definition.yaml | 1 + 14 files changed, 409 insertions(+), 135 deletions(-) create mode 100644 tests/templates/kuttl/opa/00-rbac.yaml.j2 rename tests/templates/kuttl/opa/{01-assert.yaml.j2 => 01-assert.yaml} (54%) create mode 100644 tests/templates/kuttl/opa/01-install-krb5-kdc.yaml.j2 create mode 100644 tests/templates/kuttl/opa/02-create-kerberos-secretclass.yaml.j2 delete mode 100644 tests/templates/kuttl/opa/25-assert.yaml delete mode 100644 tests/templates/kuttl/opa/25-install-test-scripts-configmap.yaml delete mode 100755 tests/templates/kuttl/opa/25_test_client_tls.sh create mode 100644 tests/templates/kuttl/opa/40-access-kafka.txt.j2 create mode 100644 tests/templates/kuttl/opa/40-access-kafka.yaml create mode 100644 tests/templates/kuttl/opa/40-assert.yaml delete mode 100644 tests/templates/kuttl/opa/40-assert.yaml.j2 diff --git a/tests/templates/kuttl/opa/00-rbac.yaml.j2 b/tests/templates/kuttl/opa/00-rbac.yaml.j2 new file mode 100644 index 00000000..7ee61d23 --- /dev/null +++ b/tests/templates/kuttl/opa/00-rbac.yaml.j2 @@ -0,0 +1,29 @@ +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: test-role +rules: +{% if test_scenario['values']['openshift'] == "true" %} + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + resourceNames: ["privileged"] + verbs: ["use"] +{% endif %} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: test-sa +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: test-rb +subjects: + - kind: ServiceAccount + name: test-sa +roleRef: + kind: Role + name: test-role + apiGroup: rbac.authorization.k8s.io diff --git a/tests/templates/kuttl/opa/01-assert.yaml.j2 b/tests/templates/kuttl/opa/01-assert.yaml similarity index 54% rename from tests/templates/kuttl/opa/01-assert.yaml.j2 rename to tests/templates/kuttl/opa/01-assert.yaml index 50b1d4c3..615b16ab 100644 --- a/tests/templates/kuttl/opa/01-assert.yaml.j2 +++ b/tests/templates/kuttl/opa/01-assert.yaml @@ -1,10 +1,16 @@ --- apiVersion: kuttl.dev/v1beta1 kind: TestAssert -{% if lookup('env', 'VECTOR_AGGREGATOR') %} --- apiVersion: v1 kind: ConfigMap metadata: name: vector-aggregator-discovery -{% endif %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: krb5-kdc +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/opa/01-install-krb5-kdc.yaml.j2 b/tests/templates/kuttl/opa/01-install-krb5-kdc.yaml.j2 new file mode 100644 index 00000000..b77a5fc2 --- /dev/null +++ b/tests/templates/kuttl/opa/01-install-krb5-kdc.yaml.j2 @@ -0,0 +1,144 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: krb5-kdc +spec: + selector: + matchLabels: + app: krb5-kdc + template: + metadata: + labels: + app: krb5-kdc + spec: + serviceAccountName: test-sa + initContainers: + - name: init + image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + args: + - sh + - -euo + - pipefail + - -c + - | + test -e /var/kerberos/krb5kdc/principal || kdb5_util create -s -P asdf + kadmin.local get_principal -terse root/admin || kadmin.local add_principal -pw asdf root/admin + # stackable-secret-operator principal must match the keytab specified in the SecretClass + kadmin.local get_principal -terse stackable-secret-operator || kadmin.local add_principal -e aes256-cts-hmac-sha384-192:normal -pw asdf stackable-secret-operator + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + - mountPath: /var/kerberos/krb5kdc + name: data + containers: + - name: kdc + image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + args: + - krb5kdc + - -n + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + - mountPath: /var/kerberos/krb5kdc + name: data +# Root permissions required on Openshift to bind to privileged port numbers +{% if test_scenario['values']['openshift'] == "true" %} + securityContext: + runAsUser: 0 +{% endif %} + - name: kadmind + image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + args: + - kadmind + - -nofork + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + - mountPath: /var/kerberos/krb5kdc + name: data +# Root permissions required on Openshift to bind to privileged port numbers +{% if test_scenario['values']['openshift'] == "true" %} + securityContext: + runAsUser: 0 +{% endif %} + - name: client + image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + tty: true + stdin: true + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + volumes: + - name: config + configMap: + name: krb5-kdc + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: krb5-kdc +spec: + selector: + app: krb5-kdc + ports: + - name: kadmin + port: 749 + - name: kdc + port: 88 + - name: kdc-udp + port: 88 + protocol: UDP +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: krb5-kdc +data: + krb5.conf: | + [logging] + default = STDERR + kdc = STDERR + admin_server = STDERR + # default = FILE:/var/log/krb5libs.log + # kdc = FILE:/var/log/krb5kdc.log + # admin_server = FILE:/vaggr/log/kadmind.log + [libdefaults] + dns_lookup_realm = false + ticket_lifetime = 24h + renew_lifetime = 7d + forwardable = true + rdns = false + default_realm = CLUSTER.LOCAL + spake_preauth_groups = edwards25519 + [realms] + CLUSTER.LOCAL = { + acl_file = /stackable/config/kadm5.acl + disable_encrypted_timestamp = false + } + [domain_realm] + .cluster.local = CLUSTER.LOCAL + cluster.local = CLUSTER.LOCAL + kadm5.acl: | + root/admin *e + stackable-secret-operator *e diff --git a/tests/templates/kuttl/opa/02-create-kerberos-secretclass.yaml.j2 b/tests/templates/kuttl/opa/02-create-kerberos-secretclass.yaml.j2 new file mode 100644 index 00000000..061a316f --- /dev/null +++ b/tests/templates/kuttl/opa/02-create-kerberos-secretclass.yaml.j2 @@ -0,0 +1,37 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: | + kubectl apply -n $NAMESPACE -f - < /dev/null -then - echo "[ERROR] Secure client topic created without certificates!" - exit 1 -else - echo "[SUCCESS] Secure client topic creation failed without certificates!" -fi - -############################################################################ -# Test the connection with bad host name -############################################################################ -if /stackable/kafka/bin/kafka-topics.sh --create --topic "$BAD_TOPIC" --bootstrap-server localhost:9093 --command-config /stackable/config/client.properties &> /dev/null -then - echo "[ERROR] Secure client topic created with bad host name!" - exit 1 -else - echo "[SUCCESS] Secure client topic creation failed with bad host name!" -fi - -echo "All client TLS tests successful!" -exit 0 diff --git a/tests/templates/kuttl/opa/30-assert.yaml.j2 b/tests/templates/kuttl/opa/30-assert.yaml.j2 index 50639b33..450aa9fe 100644 --- a/tests/templates/kuttl/opa/30-assert.yaml.j2 +++ b/tests/templates/kuttl/opa/30-assert.yaml.j2 @@ -9,7 +9,7 @@ metadata: name: test-kafka-broker-default status: readyReplicas: 1 - replicas: 1 + replicas: 3 --- apiVersion: v1 kind: PersistentVolumeClaim @@ -21,12 +21,12 @@ spec: storage: 2Gi status: phase: Bound ---- -apiVersion: policy/v1 -kind: PodDisruptionBudget -metadata: - name: test-kafka-broker -status: - expectedPods: 1 - currentHealthy: 1 - disruptionsAllowed: 1 +#--- +#apiVersion: policy/v1 +#kind: PodDisruptionBudget +#metadata: +# name: test-kafka-broker +#status: +# expectedPods: 1 +# currentHealthy: 1 +# disruptionsAllowed: 1 diff --git a/tests/templates/kuttl/opa/30-install-kafka.yaml.j2 b/tests/templates/kuttl/opa/30-install-kafka.yaml.j2 index c4257762..8b9785fa 100644 --- a/tests/templates/kuttl/opa/30-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/opa/30-install-kafka.yaml.j2 @@ -1,46 +1,48 @@ --- apiVersion: kuttl.dev/v1beta1 kind: TestStep -timeout: 300 ---- -apiVersion: kafka.stackable.tech/v1alpha1 -kind: KafkaCluster -metadata: - name: test-kafka -spec: - image: -{% if test_scenario['values']['kafka-latest'].find(",") > 0 %} - custom: "{{ test_scenario['values']['kafka-latest'].split(',')[1] }}" - productVersion: "{{ test_scenario['values']['kafka-latest'].split(',')[0] }}" -{% else %} - productVersion: "{{ test_scenario['values']['kafka-latest'] }}" -{% endif %} - pullPolicy: IfNotPresent - clusterConfig: - authorization: - opa: - configMapName: test-opa - package: authz - tls: - serverSecretClass: tls - vectorAggregatorConfigMapName: vector-aggregator-discovery - zookeeperConfigMapName: test-zk - brokers: - config: - logging: - enableVectorAgent: true - #requestedSecretLifetime: 7d - roleGroups: - default: - replicas: 1 - podOverrides: - spec: - volumes: - - name: test-scripts - configMap: - name: test-scripts - containers: - - name: kafka - volumeMounts: - - mountPath: /test-scripts - name: test-scripts +commands: + - script: | + kubectl apply -n $NAMESPACE -f - < 0 %} + custom: "{{ test_scenario['values']['kafka-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['kafka-latest'].split(',')[0] }}" + {% else %} + productVersion: "{{ test_scenario['values']['kafka-latest'] }}" + {% endif %} + pullPolicy: IfNotPresent + clusterConfig: + authentication: + - authenticationClass: kerberos-auth-$NAMESPACE + authorization: + opa: + configMapName: test-opa + package: authz + tls: + serverSecretClass: tls + vectorAggregatorConfigMapName: vector-aggregator-discovery + zookeeperConfigMapName: test-zk + brokers: + config: + logging: + enableVectorAgent: true + #requestedSecretLifetime: 7d + roleGroups: + default: + replicas: 3 diff --git a/tests/templates/kuttl/opa/40-access-kafka.txt.j2 b/tests/templates/kuttl/opa/40-access-kafka.txt.j2 new file mode 100644 index 00000000..eaf7e385 --- /dev/null +++ b/tests/templates/kuttl/opa/40-access-kafka.txt.j2 @@ -0,0 +1,118 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: access-kafka +spec: + template: + spec: + serviceAccountName: test-sa + containers: + - name: access-kafka + image: oci.stackable.tech/sdp/kafka:{{ test_scenario['values']['kafka-latest'] }}-stackable0.0.0-dev + command: + - /bin/bash + - /tmp/script/script.sh + env: + - name: KRB5_CONFIG + value: /stackable/kerberos/krb5.conf + - name: KAFKA_OPTS + value: -Djava.security.krb5.conf=/stackable/kerberos/krb5.conf + - name: KAFKA + valueFrom: + configMapKeyRef: + name: test-kafka + key: KAFKA + volumeMounts: + - name: script + mountPath: /tmp/script + - mountPath: /stackable/tls-ca-cert-mount + name: tls-ca-cert-mount + - name: kerberos + mountPath: /stackable/kerberos + volumes: + - name: script + configMap: + name: access-kafka-script + - name: kerberos + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: kerberos-$NAMESPACE + secrets.stackable.tech/scope: service=access-kafka + secrets.stackable.tech/kerberos.service.names: developer + spec: + storageClassName: secrets.stackable.tech + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "1" + - name: tls-ca-cert-mount + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: tls + secrets.stackable.tech/scope: pod + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "1" + storageClassName: secrets.stackable.tech + volumeMode: Filesystem + securityContext: + fsGroup: 1000 + restartPolicy: OnFailure +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: access-kafka-script +data: + script.sh: | + set -euxo pipefail + + export KCAT_CONFIG=/stackable/kcat.conf + TOPIC=test-topic + CONSUMER_GROUP=test-consumer-group + + echo -e -n "\ + metadata.broker.list=$KAFKA\n\ + auto.offset.reset=beginning\n\ + security.protocol=SASL_SSL\n\ + ssl.ca.location=/stackable/tls-ca-cert-mount/ca.crt\n\ + sasl.kerberos.keytab=/stackable/kerberos/keytab\n\ + sasl.kerberos.service.name=kafka\n\ + sasl.kerberos.principal=developer/access-kafka.$NAMESPACE.svc.cluster.local@CLUSTER.LOCAL\n\ + sasl.mechanism=GSSAPI\n\ + " > $KCAT_CONFIG + + cat $KCAT_CONFIG + + sent_message="Hello Stackable!" + + echo $sent_message | kcat \ + -t $TOPIC \ + -P + + echo Sent message: \"$sent_message\" + + received_message=$(kcat \ + -G $CONSUMER_GROUP \ + -o stored \ + -e \ + $TOPIC) + + echo Received message: \"$received_message\" + + if [ "$received_message" = "$sent_message" ]; then + echo "Test passed" + exit 0 + else + echo "Test failed" + exit 1 + fi diff --git a/tests/templates/kuttl/opa/40-access-kafka.yaml b/tests/templates/kuttl/opa/40-access-kafka.yaml new file mode 100644 index 00000000..966e1ebc --- /dev/null +++ b/tests/templates/kuttl/opa/40-access-kafka.yaml @@ -0,0 +1,6 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + # We need to replace $NAMESPACE (by KUTTL) + - script: envsubst '$NAMESPACE' < 40-access-kafka.txt | kubectl apply -n $NAMESPACE -f - diff --git a/tests/templates/kuttl/opa/40-assert.yaml b/tests/templates/kuttl/opa/40-assert.yaml new file mode 100644 index 00000000..edc6c317 --- /dev/null +++ b/tests/templates/kuttl/opa/40-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: access-kafka +status: + succeeded: 1 diff --git a/tests/templates/kuttl/opa/40-assert.yaml.j2 b/tests/templates/kuttl/opa/40-assert.yaml.j2 deleted file mode 100644 index b247c28a..00000000 --- a/tests/templates/kuttl/opa/40-assert.yaml.j2 +++ /dev/null @@ -1,5 +0,0 @@ -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -commands: - - script: | - kubectl exec -n $NAMESPACE test-kafka-broker-default-0 -c kafka -- bash /test-scripts/test_client_tls.sh diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 45845e7d..29332ba0 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -96,6 +96,7 @@ tests: - openshift - use-opa-tls - opa-latest + - krb5 - name: configuration dimensions: - kafka-latest From dc4a58f1a7b0914155c425a8227361edbdf60680 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 16 Jan 2026 09:36:30 +0100 Subject: [PATCH 4/6] restore pdb and fix assert --- tests/templates/kuttl/opa/30-assert.yaml.j2 | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/templates/kuttl/opa/30-assert.yaml.j2 b/tests/templates/kuttl/opa/30-assert.yaml.j2 index 450aa9fe..26661705 100644 --- a/tests/templates/kuttl/opa/30-assert.yaml.j2 +++ b/tests/templates/kuttl/opa/30-assert.yaml.j2 @@ -8,7 +8,7 @@ kind: StatefulSet metadata: name: test-kafka-broker-default status: - readyReplicas: 1 + readyReplicas: 3 replicas: 3 --- apiVersion: v1 @@ -21,12 +21,12 @@ spec: storage: 2Gi status: phase: Bound -#--- -#apiVersion: policy/v1 -#kind: PodDisruptionBudget -#metadata: -# name: test-kafka-broker -#status: -# expectedPods: 1 -# currentHealthy: 1 -# disruptionsAllowed: 1 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: test-kafka-broker +status: + expectedPods: 3 + currentHealthy: 3 + disruptionsAllowed: 1 From 0008d6a6baf8cab0fba1e0ef51ec285d99d3db1d Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 16 Jan 2026 09:40:30 +0100 Subject: [PATCH 5/6] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d65a799..bfccb60e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file. See [objectOverrides concepts page](https://docs.stackable.tech/home/nightly/concepts/overrides/#object-overrides) for details ([#927]). - Added experimental support for `4.1.1` ([#929]) - Enable the [restart-controller](https://docs.stackable.tech/home/nightly/commons-operator/restarter/), so that the Pods are automatically restarted on config changes ([#930]). +- Added support for OPA/TLS ([#928]). ### Changed @@ -27,6 +28,7 @@ All notable changes to this project will be documented in this file. [#915]: https://github.com/stackabletech/kafka-operator/pull/915 [#925]: https://github.com/stackabletech/kafka-operator/pull/925 [#927]: https://github.com/stackabletech/kafka-operator/pull/927 +[#928]: https://github.com/stackabletech/kafka-operator/pull/928 [#929]: https://github.com/stackabletech/kafka-operator/pull/929 [#930]: https://github.com/stackabletech/kafka-operator/pull/930 From e70a0c1cf75216598a738c684147ed17379ee412 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 16 Jan 2026 14:51:09 +0100 Subject: [PATCH 6/6] refine rego rule --- .../kuttl/opa/20-install-opa.yaml.j2 | 59 +++++++++++++------ 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/tests/templates/kuttl/opa/20-install-opa.yaml.j2 b/tests/templates/kuttl/opa/20-install-opa.yaml.j2 index 22a6d8d1..47c2b65a 100644 --- a/tests/templates/kuttl/opa/20-install-opa.yaml.j2 +++ b/tests/templates/kuttl/opa/20-install-opa.yaml.j2 @@ -15,25 +15,50 @@ commands: kafka.rego: | package authz - # A very simple rule: - # If set to false this will prevent the kafka cluster from deploying. - # If set to true the topc can be created and read with e.g. the - # following in the decision log: - # "input": { - # "action": { - # ... - # "operation": "CREATE", - # ... - # } - # }, - # ... - # }, - # "msg": "Decision Log", - # "path": "authz/allow", - # "result": true, + default allow := false allow if { - true + input.requestContext.principal.name == "kafka" + } + + allow if { + input.requestContext.principal.name == "CN=generated certificate for pod" + } + + allow if { + input.requestContext.principal.name == "developer" + resource_is_allowed + } + + resource_is_allowed if { + input.action.resourcePattern.resourceType == "TOPIC" + action_is_allowed + } + + resource_is_allowed if { + input.action.resourcePattern.resourceType == "GROUP" + input.action.operation == "DESCRIBE" + } + + resource_is_allowed if { + input.action.resourcePattern.resourceType == "GROUP" + input.action.operation == "READ" + } + + action_is_allowed if { + input.action.operation == "CREATE" + } + + action_is_allowed if { + input.action.operation == "DESCRIBE" + } + + action_is_allowed if { + input.action.operation == "READ" + } + + action_is_allowed if { + input.action.operation == "WRITE" } --- apiVersion: opa.stackable.tech/v1alpha1