diff --git a/CHANGELOG.md b/CHANGELOG.md index f31dfd8e..c5591128 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,9 @@ All notable changes to this project will be documented in this file. - Support objectOverrides using `.spec.objectOverrides`. See [objectOverrides concepts page](https://docs.stackable.tech/home/nightly/concepts/overrides/#object-overrides) for details ([#927]). +- Added support for OPA/TLS ([#928]). - Added experimental support for `4.1.1` ([#929]) +- Enable the [restart-controller](https://docs.stackable.tech/home/nightly/commons-operator/restarter/), so that the Pods are automatically restarted on config changes ([#930]). - Enable the [restart-controller](https://docs.stackable.tech/home/nightly/commons-operator/restarter/), so that the Pods are automatically restarted on config changes ([#930], [#932]). ### Changed @@ -27,6 +29,7 @@ All notable changes to this project will be documented in this file. [#915]: https://github.com/stackabletech/kafka-operator/pull/915 [#925]: https://github.com/stackabletech/kafka-operator/pull/925 [#927]: https://github.com/stackabletech/kafka-operator/pull/927 +[#928]: https://github.com/stackabletech/kafka-operator/pull/928 [#929]: https://github.com/stackabletech/kafka-operator/pull/929 [#930]: https://github.com/stackabletech/kafka-operator/pull/930 [#932]: https://github.com/stackabletech/kafka-operator/pull/932 diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index b2f31e8a..252d28c0 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -31,6 +31,8 @@ pub fn broker_kafka_container_commands( containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop & {set_realm_env} + {import_opa_tls_cert} + {broker_start_command} wait_for_termination $! @@ -42,6 +44,7 @@ pub fn broker_kafka_container_commands( true => format!("export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {STACKABLE_KERBEROS_KRB5_PATH})"), false => "".to_string(), }, + import_opa_tls_cert = kafka_security.copy_opa_tls_cert_command(), broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, product_version), } } diff --git a/rust/operator-binary/src/crd/authorization.rs b/rust/operator-binary/src/crd/authorization.rs index fef0cd8a..d5391304 100644 --- a/rust/operator-binary/src/crd/authorization.rs +++ b/rust/operator-binary/src/crd/authorization.rs @@ -1,12 +1,78 @@ use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ - commons::opa::OpaConfig, + client::Client, + commons::opa::{OpaApiVersion, OpaConfig}, + k8s_openapi::api::core::v1::ConfigMap, schemars::{self, JsonSchema}, }; +use crate::crd::v1alpha1; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("Failed to fetch OPA ConfigMap {configmap_name}"))] + FetchOpaConfigMap { + source: stackable_operator::client::Error, + configmap_name: String, + namespace: String, + }, + + #[snafu(display("invalid OpaConfig"))] + InvalidOpaConfig { + source: stackable_operator::commons::opa::Error, + }, + + #[snafu(display("object defines no namespace"))] + ObjectHasNoNamespace, +} + #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct KafkaAuthorization { // no doc - docs in the OpaConfig struct. pub opa: Option, } + +pub struct KafkaAuthorizationConfig { + pub opa_connect: String, + pub secret_class: Option, +} + +impl KafkaAuthorization { + pub async fn get_opa_config( + self, + client: &Client, + kafka: &v1alpha1::KafkaCluster, + ) -> Result, Error> { + let auth_config = if let Some(opa) = self.opa { + let namespace = kafka + .metadata + .namespace + .as_deref() + .context(ObjectHasNoNamespaceSnafu)?; + // Resolve the secret class from the ConfigMap + let secret_class = client + .get::(&opa.config_map_name, namespace) + .await + .with_context(|_| FetchOpaConfigMapSnafu { + configmap_name: &opa.config_map_name, + namespace, + })? + .data + .and_then(|mut data| data.remove("OPA_SECRET_CLASS")); + let opa_connect = opa + .full_document_url_from_config_map(client, kafka, Some("allow"), OpaApiVersion::V1) + .await + .context(InvalidOpaConfigSnafu)?; + Some(KafkaAuthorizationConfig { + opa_connect, + secret_class, + }) + } else { + None + }; + + Ok(auth_config) + } +} diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 97b15b85..1f0b8c56 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -403,6 +403,7 @@ mod tests { }]), "internalTls".to_string(), Some("tls".to_string()), + None, ); let cluster_info = default_cluster_info(); // "simple-kafka-broker-default" @@ -462,6 +463,7 @@ mod tests { ResolvedAuthenticationClasses::new(vec![]), "tls".to_string(), Some("tls".to_string()), + None, ); let config = get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info) @@ -518,6 +520,7 @@ mod tests { ResolvedAuthenticationClasses::new(vec![]), "".to_string(), None, + None, ); let config = @@ -605,6 +608,7 @@ mod tests { }]), "tls".to_string(), Some("tls".to_string()), + None, ); let cluster_info = default_cluster_info(); // "simple-kafka-broker-default" diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index b729386a..3aac5bae 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -52,6 +52,11 @@ pub enum Error { #[snafu(display("kerberos enablement requires TLS activation"))] KerberosRequiresTls, + + #[snafu(display("failed to build OPA TLS certificate volume"))] + OpaTlsCertSecretClassVolumeBuild { + source: stackable_operator::builder::pod::volume::SecretOperatorVolumeSourceBuilderError, + }, } /// Helper struct combining TLS settings for server and internal with the resolved AuthenticationClasses @@ -59,6 +64,7 @@ pub struct KafkaTlsSecurity { resolved_authentication_classes: ResolvedAuthenticationClasses, internal_secret_class: String, server_secret_class: Option, + opa_secret_class: Option, } impl KafkaTlsSecurity { @@ -75,6 +81,9 @@ impl KafkaTlsSecurity { pub const INTERNAL_PORT: u16 = 19092; // - TLS internal const INTER_BROKER_LISTENER_NAME: &'static str = "inter.broker.listener.name"; + const OPA_TLS_MOUNT_PATH: &str = "/stackable/tls-opa"; + // opa + const OPA_TLS_VOLUME_NAME: &str = "tls-opa"; pub const SECURE_BOOTSTRAP_PORT: u16 = 9095; pub const SECURE_CLIENT_PORT: u16 = 9093; pub const SECURE_CLIENT_PORT_NAME: &'static str = "kafka-tls"; @@ -94,11 +103,13 @@ impl KafkaTlsSecurity { resolved_authentication_classes: ResolvedAuthenticationClasses, internal_secret_class: String, server_secret_class: Option, + opa_secret_class: Option, ) -> Self { Self { resolved_authentication_classes, internal_secret_class, server_secret_class, + opa_secret_class, } } @@ -107,6 +118,7 @@ impl KafkaTlsSecurity { pub async fn new_from_kafka_cluster( client: &Client, kafka: &v1alpha1::KafkaCluster, + opa_secret_class: Option, ) -> Result { Ok(KafkaTlsSecurity { resolved_authentication_classes: ResolvedAuthenticationClasses::from_references( @@ -128,6 +140,7 @@ impl KafkaTlsSecurity { .tls .as_ref() .and_then(|tls| tls.server_secret_class.clone()), + opa_secret_class, }) } @@ -166,6 +179,22 @@ impl KafkaTlsSecurity { self.kerberos_secret_class().is_some() } + fn has_opa_tls_enabled(&self) -> bool { + self.opa_secret_class.is_some() + } + + pub fn copy_opa_tls_cert_command(&self) -> String { + match self.has_opa_tls_enabled() { + true => format!( + "keytool -importcert -file {opa_mount_path}/ca.crt -keystore {tls_dir}/truststore.p12 -storepass '{tls_password}' -alias opa-ca -noprompt", + opa_mount_path = Self::OPA_TLS_MOUNT_PATH, + tls_dir = Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR, + tls_password = Self::SSL_STORE_PASSWORD, + ), + false => "".to_string(), + } + } + pub fn kerberos_secret_class(&self) -> Option { if let Some(kerberos) = self .resolved_authentication_classes @@ -486,6 +515,24 @@ impl KafkaTlsSecurity { .context(AddVolumeMountSnafu)?; } + if let Some(secret_class) = &self.opa_secret_class { + cb_kafka + .add_volume_mount(Self::OPA_TLS_VOLUME_NAME, Self::OPA_TLS_MOUNT_PATH) + .context(AddVolumeMountSnafu)?; + + pod_builder + .add_volume( + VolumeBuilder::new(Self::OPA_TLS_VOLUME_NAME) + .ephemeral( + SecretOperatorVolumeSourceBuilder::new(secret_class) + .build() + .context(OpaTlsCertSecretClassVolumeBuildSnafu)?, + ) + .build(), + ) + .context(AddVolumeSnafu)?; + } + Ok(()) } @@ -664,6 +711,22 @@ impl KafkaTlsSecurity { ); } + //OPA Tls + if self.opa_secret_class.is_some() { + config.insert( + "opa.authorizer.truststore.path".to_string(), + format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), + ); + config.insert( + "opa.authorizer.truststore.password".to_string(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + } + config.insert( + "opa.authorizer.truststore.type".to_string(), + "PKCS12".to_string(), + ); + // common config.insert( Self::INTER_BROKER_LISTENER_NAME.to_string(), diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 1e83abfa..4a4ac599 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -8,7 +8,6 @@ use snafu::{ResultExt, Snafu}; use stackable_operator::{ cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, commons::{ - opa::OpaApiVersion, product_image_selection::{self}, rbac::build_rbac_resources, }, @@ -36,7 +35,7 @@ use strum::{EnumDiscriminants, IntoStaticStr}; use crate::{ crd::{ self, APP_NAME, DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KafkaClusterStatus, - OPERATOR_NAME, + OPERATOR_NAME, authorization, listener::get_kafka_listener_config, role::{ AnyConfig, KafkaRole, broker::BROKER_PROPERTIES_FILE, @@ -122,11 +121,6 @@ pub enum Error { source: stackable_operator::cluster_resources::Error, }, - #[snafu(display("invalid OpaConfig"))] - InvalidOpaConfig { - source: stackable_operator::commons::opa::Error, - }, - #[snafu(display("failed to delete orphaned resources"))] DeleteOrphans { source: stackable_operator::cluster_resources::Error, @@ -212,6 +206,9 @@ pub enum Error { BuildListener { source: crate::resource::listener::Error, }, + + #[snafu(display("object defines no namespace"))] + GetOpaConfig { source: authorization::Error }, } type Result = std::result::Result; @@ -231,7 +228,6 @@ impl ReconcilerError for Error { Error::InvalidProductConfig { .. } => None, Error::BuildDiscoveryConfig { .. } => None, Error::ApplyDiscoveryConfig { .. } => None, - Error::InvalidOpaConfig { .. } => None, Error::DeleteOrphans { .. } => None, Error::FailedToInitializeSecurityContext { .. } => None, Error::CreateClusterResources { .. } => None, @@ -253,6 +249,7 @@ impl ReconcilerError for Error { Error::BuildListener { .. } => None, Error::InvalidKafkaListeners { .. } => None, Error::BuildPodDescriptors { .. } => None, + Error::GetOpaConfig { .. } => None, } } } @@ -298,7 +295,28 @@ pub async fn reconcile_kafka( &ctx.product_config, )?; - let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster(client, kafka) + // Assemble the OPA connection string from the discovery and the given path if provided + // Will be passed as --override parameter in the cli in the stateful set + let opa_config = &kafka + .spec + .cluster_config + .authorization + .clone() + .get_opa_config(client, kafka) + .await + .context(GetOpaConfigSnafu)?; + + let opa_connect = opa_config + .as_ref() + .map(|auth_config| auth_config.opa_connect.clone()); + + let opa_secret_class = if let Some(opa_config) = opa_config.as_ref() { + opa_config.secret_class.to_owned() + } else { + None + }; + + let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster(client, kafka, opa_secret_class) .await .context(FailedToInitializeSecurityContextSnafu)?; @@ -314,19 +332,6 @@ pub async fn reconcile_kafka( .validate_authentication_methods() .context(FailedToValidateAuthenticationMethodSnafu)?; - // Assemble the OPA connection string from the discovery and the given path if provided - // Will be passed as --override parameter in the cli in the state ful set - let opa_connect = if let Some(opa_spec) = &kafka.spec.cluster_config.authorization.opa { - Some( - opa_spec - .full_document_url_from_config_map(client, kafka, Some("allow"), OpaApiVersion::V1) - .await - .context(InvalidOpaConfigSnafu)?, - ) - } else { - None - }; - let mut ss_cond_builder = StatefulSetConditionBuilder::default(); let (rbac_sa, rbac_rolebinding) = build_rbac_resources( diff --git a/tests/release.yaml b/tests/release.yaml index fa692d24..981efdf3 100644 --- a/tests/release.yaml +++ b/tests/release.yaml @@ -16,3 +16,5 @@ releases: operatorVersion: 0.0.0-dev kafka: operatorVersion: 0.0.0-dev + opa: + operatorVersion: 0.0.0-dev diff --git a/tests/templates/kuttl/opa/00-patch-ns.yaml.j2 b/tests/templates/kuttl/opa/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/opa/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/opa/00-rbac.yaml.j2 b/tests/templates/kuttl/opa/00-rbac.yaml.j2 new file mode 100644 index 00000000..7ee61d23 --- /dev/null +++ b/tests/templates/kuttl/opa/00-rbac.yaml.j2 @@ -0,0 +1,29 @@ +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: test-role +rules: +{% if test_scenario['values']['openshift'] == "true" %} + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + resourceNames: ["privileged"] + verbs: ["use"] +{% endif %} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: test-sa +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: test-rb +subjects: + - kind: ServiceAccount + name: test-sa +roleRef: + kind: Role + name: test-role + apiGroup: rbac.authorization.k8s.io diff --git a/tests/templates/kuttl/opa/01-assert.yaml b/tests/templates/kuttl/opa/01-assert.yaml new file mode 100644 index 00000000..615b16ab --- /dev/null +++ b/tests/templates/kuttl/opa/01-assert.yaml @@ -0,0 +1,16 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: krb5-kdc +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/opa/01-install-krb5-kdc.yaml.j2 b/tests/templates/kuttl/opa/01-install-krb5-kdc.yaml.j2 new file mode 100644 index 00000000..b77a5fc2 --- /dev/null +++ b/tests/templates/kuttl/opa/01-install-krb5-kdc.yaml.j2 @@ -0,0 +1,144 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: krb5-kdc +spec: + selector: + matchLabels: + app: krb5-kdc + template: + metadata: + labels: + app: krb5-kdc + spec: + serviceAccountName: test-sa + initContainers: + - name: init + image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + args: + - sh + - -euo + - pipefail + - -c + - | + test -e /var/kerberos/krb5kdc/principal || kdb5_util create -s -P asdf + kadmin.local get_principal -terse root/admin || kadmin.local add_principal -pw asdf root/admin + # stackable-secret-operator principal must match the keytab specified in the SecretClass + kadmin.local get_principal -terse stackable-secret-operator || kadmin.local add_principal -e aes256-cts-hmac-sha384-192:normal -pw asdf stackable-secret-operator + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + - mountPath: /var/kerberos/krb5kdc + name: data + containers: + - name: kdc + image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + args: + - krb5kdc + - -n + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + - mountPath: /var/kerberos/krb5kdc + name: data +# Root permissions required on Openshift to bind to privileged port numbers +{% if test_scenario['values']['openshift'] == "true" %} + securityContext: + runAsUser: 0 +{% endif %} + - name: kadmind + image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + args: + - kadmind + - -nofork + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + - mountPath: /var/kerberos/krb5kdc + name: data +# Root permissions required on Openshift to bind to privileged port numbers +{% if test_scenario['values']['openshift'] == "true" %} + securityContext: + runAsUser: 0 +{% endif %} + - name: client + image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + tty: true + stdin: true + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + volumes: + - name: config + configMap: + name: krb5-kdc + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: krb5-kdc +spec: + selector: + app: krb5-kdc + ports: + - name: kadmin + port: 749 + - name: kdc + port: 88 + - name: kdc-udp + port: 88 + protocol: UDP +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: krb5-kdc +data: + krb5.conf: | + [logging] + default = STDERR + kdc = STDERR + admin_server = STDERR + # default = FILE:/var/log/krb5libs.log + # kdc = FILE:/var/log/krb5kdc.log + # admin_server = FILE:/vaggr/log/kadmind.log + [libdefaults] + dns_lookup_realm = false + ticket_lifetime = 24h + renew_lifetime = 7d + forwardable = true + rdns = false + default_realm = CLUSTER.LOCAL + spake_preauth_groups = edwards25519 + [realms] + CLUSTER.LOCAL = { + acl_file = /stackable/config/kadm5.acl + disable_encrypted_timestamp = false + } + [domain_realm] + .cluster.local = CLUSTER.LOCAL + cluster.local = CLUSTER.LOCAL + kadm5.acl: | + root/admin *e + stackable-secret-operator *e diff --git a/tests/templates/kuttl/opa/01-install-vector-aggregator-discovery-configmap.yaml b/tests/templates/kuttl/opa/01-install-vector-aggregator-discovery-configmap.yaml new file mode 100644 index 00000000..197db165 --- /dev/null +++ b/tests/templates/kuttl/opa/01-install-vector-aggregator-discovery-configmap.yaml @@ -0,0 +1,17 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install vector-aggregator vector + --namespace $NAMESPACE + --version 0.45.0 + --repo https://helm.vector.dev + --values vector-aggregator-values.yaml +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: vector-aggregator:6123 diff --git a/tests/templates/kuttl/opa/02-create-kerberos-secretclass.yaml.j2 b/tests/templates/kuttl/opa/02-create-kerberos-secretclass.yaml.j2 new file mode 100644 index 00000000..061a316f --- /dev/null +++ b/tests/templates/kuttl/opa/02-create-kerberos-secretclass.yaml.j2 @@ -0,0 +1,37 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: | + kubectl apply -n $NAMESPACE -f - < 0 %} + custom: "{{ test_scenario['values']['opa-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['opa-latest'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['opa-latest'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: +{% if test_scenario['values']['use-opa-tls'] == "true" %} + tls: + serverSecretClass: opa-tls-$NAMESPACE +{% endif %} + vectorAggregatorConfigMapName: vector-aggregator-discovery + servers: + config: + logging: + enableVectorAgent: true + containers: + opa: + console: + level: INFO + file: + level: INFO + loggers: + decision: + level: INFO + roleGroups: + default: {} + EOF diff --git a/tests/templates/kuttl/opa/30-assert.yaml.j2 b/tests/templates/kuttl/opa/30-assert.yaml.j2 new file mode 100644 index 00000000..26661705 --- /dev/null +++ b/tests/templates/kuttl/opa/30-assert.yaml.j2 @@ -0,0 +1,32 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-kafka-broker-default +status: + readyReplicas: 3 + replicas: 3 +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: log-dirs-test-kafka-broker-default-0 +spec: + resources: + requests: + storage: 2Gi +status: + phase: Bound +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: test-kafka-broker +status: + expectedPods: 3 + currentHealthy: 3 + disruptionsAllowed: 1 diff --git a/tests/templates/kuttl/opa/30-install-kafka.yaml.j2 b/tests/templates/kuttl/opa/30-install-kafka.yaml.j2 new file mode 100644 index 00000000..8b9785fa --- /dev/null +++ b/tests/templates/kuttl/opa/30-install-kafka.yaml.j2 @@ -0,0 +1,48 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: | + kubectl apply -n $NAMESPACE -f - < 0 %} + custom: "{{ test_scenario['values']['kafka-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['kafka-latest'].split(',')[0] }}" + {% else %} + productVersion: "{{ test_scenario['values']['kafka-latest'] }}" + {% endif %} + pullPolicy: IfNotPresent + clusterConfig: + authentication: + - authenticationClass: kerberos-auth-$NAMESPACE + authorization: + opa: + configMapName: test-opa + package: authz + tls: + serverSecretClass: tls + vectorAggregatorConfigMapName: vector-aggregator-discovery + zookeeperConfigMapName: test-zk + brokers: + config: + logging: + enableVectorAgent: true + #requestedSecretLifetime: 7d + roleGroups: + default: + replicas: 3 diff --git a/tests/templates/kuttl/opa/40-access-kafka.txt.j2 b/tests/templates/kuttl/opa/40-access-kafka.txt.j2 new file mode 100644 index 00000000..eaf7e385 --- /dev/null +++ b/tests/templates/kuttl/opa/40-access-kafka.txt.j2 @@ -0,0 +1,118 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: access-kafka +spec: + template: + spec: + serviceAccountName: test-sa + containers: + - name: access-kafka + image: oci.stackable.tech/sdp/kafka:{{ test_scenario['values']['kafka-latest'] }}-stackable0.0.0-dev + command: + - /bin/bash + - /tmp/script/script.sh + env: + - name: KRB5_CONFIG + value: /stackable/kerberos/krb5.conf + - name: KAFKA_OPTS + value: -Djava.security.krb5.conf=/stackable/kerberos/krb5.conf + - name: KAFKA + valueFrom: + configMapKeyRef: + name: test-kafka + key: KAFKA + volumeMounts: + - name: script + mountPath: /tmp/script + - mountPath: /stackable/tls-ca-cert-mount + name: tls-ca-cert-mount + - name: kerberos + mountPath: /stackable/kerberos + volumes: + - name: script + configMap: + name: access-kafka-script + - name: kerberos + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: kerberos-$NAMESPACE + secrets.stackable.tech/scope: service=access-kafka + secrets.stackable.tech/kerberos.service.names: developer + spec: + storageClassName: secrets.stackable.tech + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "1" + - name: tls-ca-cert-mount + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: tls + secrets.stackable.tech/scope: pod + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "1" + storageClassName: secrets.stackable.tech + volumeMode: Filesystem + securityContext: + fsGroup: 1000 + restartPolicy: OnFailure +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: access-kafka-script +data: + script.sh: | + set -euxo pipefail + + export KCAT_CONFIG=/stackable/kcat.conf + TOPIC=test-topic + CONSUMER_GROUP=test-consumer-group + + echo -e -n "\ + metadata.broker.list=$KAFKA\n\ + auto.offset.reset=beginning\n\ + security.protocol=SASL_SSL\n\ + ssl.ca.location=/stackable/tls-ca-cert-mount/ca.crt\n\ + sasl.kerberos.keytab=/stackable/kerberos/keytab\n\ + sasl.kerberos.service.name=kafka\n\ + sasl.kerberos.principal=developer/access-kafka.$NAMESPACE.svc.cluster.local@CLUSTER.LOCAL\n\ + sasl.mechanism=GSSAPI\n\ + " > $KCAT_CONFIG + + cat $KCAT_CONFIG + + sent_message="Hello Stackable!" + + echo $sent_message | kcat \ + -t $TOPIC \ + -P + + echo Sent message: \"$sent_message\" + + received_message=$(kcat \ + -G $CONSUMER_GROUP \ + -o stored \ + -e \ + $TOPIC) + + echo Received message: \"$received_message\" + + if [ "$received_message" = "$sent_message" ]; then + echo "Test passed" + exit 0 + else + echo "Test failed" + exit 1 + fi diff --git a/tests/templates/kuttl/opa/40-access-kafka.yaml b/tests/templates/kuttl/opa/40-access-kafka.yaml new file mode 100644 index 00000000..966e1ebc --- /dev/null +++ b/tests/templates/kuttl/opa/40-access-kafka.yaml @@ -0,0 +1,6 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + # We need to replace $NAMESPACE (by KUTTL) + - script: envsubst '$NAMESPACE' < 40-access-kafka.txt | kubectl apply -n $NAMESPACE -f - diff --git a/tests/templates/kuttl/opa/40-assert.yaml b/tests/templates/kuttl/opa/40-assert.yaml new file mode 100644 index 00000000..edc6c317 --- /dev/null +++ b/tests/templates/kuttl/opa/40-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: access-kafka +status: + succeeded: 1 diff --git a/tests/templates/kuttl/opa/vector-aggregator-values.yaml.j2 b/tests/templates/kuttl/opa/vector-aggregator-values.yaml.j2 new file mode 100644 index 00000000..4bb9fdc4 --- /dev/null +++ b/tests/templates/kuttl/opa/vector-aggregator-values.yaml.j2 @@ -0,0 +1,80 @@ +--- +role: Aggregator +service: + ports: + - name: api + port: 8686 + protocol: TCP + targetPort: 8686 + - name: vector + port: 6123 + protocol: TCP + targetPort: 6000 +customConfig: + api: + address: 0.0.0.0:8686 + enabled: true + sources: + vector: + address: 0.0.0.0:6000 + type: vector + version: "2" + transforms: + validEvents: + type: filter + inputs: [vector] + condition: is_null(.errors) + filteredAutomaticLogConfigServerOpaDecision: + type: filter + inputs: [validEvents] + condition: >- + starts_with(string!(.pod), "test-opa-server-automatic-log-config") && + .container == "opa" && + .logger == "decision" + filteredAutomaticLogConfigServerOpa: + type: filter + inputs: [validEvents] + condition: >- + starts_with(string!(.pod), "test-opa-server-automatic-log-config") && + .container == "opa" && + .logger != "decision" + filteredAutomaticLogConfigServerBundleBuilder: + type: filter + inputs: [validEvents] + condition: >- + starts_with(string!(.pod), "test-opa-server-automatic-log-config") && + .container == "bundle-builder" + filteredAutomaticLogConfigServerVector: + type: filter + inputs: [validEvents] + condition: >- + starts_with(string!(.pod), "test-opa-server-automatic-log-config") && + .container == "vector" + filteredAutomaticLogConfigServerPrepare: + type: filter + inputs: [validEvents] + condition: >- + starts_with(string!(.pod), "test-opa-server-automatic-log-config") && + .container == "prepare" + filteredInvalidEvents: + type: filter + inputs: [vector] + condition: |- + .timestamp == from_unix_timestamp!(0) || + is_null(.level) || + is_null(.logger) || + is_null(.message) + sinks: + test: + inputs: [filtered*] + type: blackhole +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + aggregator: + inputs: [vector] + type: vector + address: {{ lookup('env', 'VECTOR_AGGREGATOR') }} + buffer: + # Avoid back pressure from VECTOR_AGGREGATOR. The test should + # not fail if the aggregator is not available. + when_full: drop_newest +{% endif %} diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index cfe2282d..37857ac9 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -26,6 +26,9 @@ dimensions: - name: zookeeper-latest values: - 3.9.4 + - name: opa-latest + values: + - 1.8.0 - name: upgrade_old values: - 3.9.1 @@ -40,6 +43,10 @@ dimensions: values: - "true" - "false" + - name: use-opa-tls + values: + - "true" + - "false" - name: openshift values: - "false" @@ -79,6 +86,14 @@ tests: - zookeeper - use-client-tls - openshift + - name: opa + dimensions: + - kafka-latest + - zookeeper-latest + - openshift + - use-opa-tls + - opa-latest + - krb5 - name: configuration dimensions: - kafka-latest