diff --git a/deploy/helm/kafka-operator/crds/crds.yaml b/deploy/helm/kafka-operator/crds/crds.yaml index 323b1c1d..d17f8cfc 100644 --- a/deploy/helm/kafka-operator/crds/crds.yaml +++ b/deploy/helm/kafka-operator/crds/crds.yaml @@ -920,6 +920,10 @@ spec: type: object x-kubernetes-preserve-unknown-fields: true type: object + bootstrapListenerClass: + description: The ListenerClass used for bootstrapping new clients. + nullable: true + type: string gracefulShutdownTimeout: description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. nullable: true @@ -1271,6 +1275,10 @@ spec: type: object x-kubernetes-preserve-unknown-fields: true type: object + bootstrapListenerClass: + description: The ListenerClass used for bootstrapping new clients. + nullable: true + type: string gracefulShutdownTimeout: description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. nullable: true diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 97b15b85..8e4a1f59 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -9,7 +9,10 @@ use stackable_operator::{ }; use strum::{EnumDiscriminants, EnumString}; -use crate::crd::{STACKABLE_LISTENER_BROKER_DIR, security::KafkaTlsSecurity, v1alpha1}; +use crate::crd::{ + STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, security::KafkaTlsSecurity, + v1alpha1, +}; const LISTENER_LOCAL_ADDRESS: &str = "0.0.0.0"; @@ -186,13 +189,22 @@ impl Display for KafkaListener { } } +// Builds a list of listeners for the given Kafka cluster and rolegroup. +// +// TODO: Not every listener is necessarily used by every role while some listeners are used by both roles. +// Yeah, this is confusing and needs refactoring. +// +// For example, the INTERNAL and CLIENT listener are only configured on brokers. +// On the other hand, the BOOTSTRAP listener is configured on both roles. +// Note that actual the bootstrap services are different between brokers and controllers but from the +// Kafka perspective they are both called BOOTSTRAP. pub fn get_kafka_listener_config( kafka: &v1alpha1::KafkaCluster, kafka_security: &KafkaTlsSecurity, rolegroup_ref: &RoleGroupRef, cluster_info: &KubernetesClusterInfo, ) -> Result { - let pod_fqdn = pod_fqdn( + let headless_pod_fqdn = pod_fqdn( kafka, &rolegroup_ref.rolegroup_headless_service_name(), cluster_info, @@ -269,7 +281,7 @@ pub fn get_kafka_listener_config( }); advertised_listeners.push(KafkaListener { name: KafkaListenerName::Internal, - host: pod_fqdn.to_string(), + host: headless_pod_fqdn.to_string(), port: KafkaTlsSecurity::SECURE_INTERNAL_PORT.to_string(), }); listener_security_protocol_map @@ -285,7 +297,7 @@ pub fn get_kafka_listener_config( }); advertised_listeners.push(KafkaListener { name: KafkaListenerName::Internal, - host: pod_fqdn.to_string(), + host: headless_pod_fqdn.to_string(), port: kafka_security.internal_port().to_string(), }); listener_security_protocol_map.insert( @@ -298,23 +310,39 @@ pub fn get_kafka_listener_config( ); } + let bootstrap_service_fqdn = service_fqdn( + kafka, + &rolegroup_service_name(rolegroup_ref, KafkaListenerName::Bootstrap), + cluster_info, + )?; + // BOOTSTRAP + listeners.push(KafkaListener { + name: KafkaListenerName::Bootstrap, + host: LISTENER_LOCAL_ADDRESS.to_string(), + port: kafka_security.bootstrap_port().to_string(), + }); + advertised_listeners.push(KafkaListener { + name: KafkaListenerName::Bootstrap, + host: bootstrap_service_fqdn.to_string(), + port: node_port_cmd( + STACKABLE_LISTENER_BOOTSTRAP_DIR, + kafka_security.bootstrap_port_name(), + ), + }); if kafka_security.has_kerberos_enabled() { - listeners.push(KafkaListener { - name: KafkaListenerName::Bootstrap, - host: LISTENER_LOCAL_ADDRESS.to_string(), - port: kafka_security.bootstrap_port().to_string(), - }); - advertised_listeners.push(KafkaListener { - name: KafkaListenerName::Bootstrap, - host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), - port: node_port_cmd( - STACKABLE_LISTENER_BROKER_DIR, - kafka_security.client_port_name(), - ), - }); listener_security_protocol_map .insert(KafkaListenerName::Bootstrap, KafkaListenerProtocol::SaslSsl); + } else if kafka_security.tls_client_authentication_class().is_some() + || kafka_security.tls_server_secret_class().is_some() + { + listener_security_protocol_map + .insert(KafkaListenerName::Bootstrap, KafkaListenerProtocol::Ssl); + } else { + listener_security_protocol_map.insert( + KafkaListenerName::Bootstrap, + KafkaListenerProtocol::Plaintext, + ); } Ok(KafkaListenerConfig { @@ -352,6 +380,31 @@ pub fn pod_fqdn( )) } +// TODO: This is the more general version to `RoleGroupRef::rolegroup_headless_service_name()` +// because we need it for the bootstrap service as well, which doesn't exist in op-rs. +fn rolegroup_service_name( + rolegroup_ref: &RoleGroupRef, + listener: KafkaListenerName, +) -> String { + format!( + "{name}-{service}", + name = rolegroup_ref.object_name(), + service = listener.to_string().to_lowercase() + ) +} + +pub fn service_fqdn( + kafka: &v1alpha1::KafkaCluster, + sts_service_name: &str, + cluster_info: &KubernetesClusterInfo, +) -> Result { + Ok(format!( + "{sts_service_name}.{namespace}.svc.{cluster_domain}", + namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?, + cluster_domain = cluster_info.cluster_domain + )) +} + #[cfg(test)] mod tests { use stackable_operator::{ @@ -414,20 +467,23 @@ mod tests { assert_eq!( config.listeners(), format!( - "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}", name = KafkaListenerName::Client, host = LISTENER_LOCAL_ADDRESS, port = kafka_security.client_port(), internal_name = KafkaListenerName::Internal, internal_host = LISTENER_LOCAL_ADDRESS, internal_port = kafka_security.internal_port(), + bootstrap_name = KafkaListenerName::Bootstrap, + bootstrap_host = LISTENER_LOCAL_ADDRESS, + bootstrap_port = kafka_security.bootstrap_port(), ) ); assert_eq!( config.advertised_listeners(), format!( - "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}", name = KafkaListenerName::Client, host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), port = node_port_cmd( @@ -442,17 +498,30 @@ mod tests { ) .unwrap(), internal_port = kafka_security.internal_port(), + bootstrap_name = KafkaListenerName::Bootstrap, + bootstrap_host = service_fqdn( + &kafka, + &rolegroup_service_name(&rolegroup_ref, KafkaListenerName::Bootstrap), + &cluster_info + ) + .unwrap(), + bootstrap_port = node_port_cmd( + STACKABLE_LISTENER_BOOTSTRAP_DIR, + kafka_security.bootstrap_port_name() + ), ) ); assert_eq!( config.listener_security_protocol_map(), format!( - "{name}:{protocol},{internal_name}:{internal_protocol},{controller_name}:{controller_protocol}", + "{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol},{controller_name}:{controller_protocol}", name = KafkaListenerName::Client, protocol = KafkaListenerProtocol::Ssl, internal_name = KafkaListenerName::Internal, internal_protocol = KafkaListenerProtocol::Ssl, + bootstrap_name = KafkaListenerName::Bootstrap, + bootstrap_protocol = KafkaListenerProtocol::Ssl, controller_name = KafkaListenerName::Controller, controller_protocol = KafkaListenerProtocol::Ssl, ) @@ -470,20 +539,23 @@ mod tests { assert_eq!( config.listeners(), format!( - "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}", name = KafkaListenerName::Client, host = LISTENER_LOCAL_ADDRESS, port = kafka_security.client_port(), internal_name = KafkaListenerName::Internal, internal_host = LISTENER_LOCAL_ADDRESS, internal_port = kafka_security.internal_port(), + bootstrap_name = KafkaListenerName::Bootstrap, + bootstrap_host = LISTENER_LOCAL_ADDRESS, + bootstrap_port = kafka_security.bootstrap_port(), ) ); assert_eq!( config.advertised_listeners(), format!( - "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}", name = KafkaListenerName::Client, host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), port = node_port_cmd( @@ -498,19 +570,32 @@ mod tests { ) .unwrap(), internal_port = kafka_security.internal_port(), + bootstrap_name = KafkaListenerName::Bootstrap, + bootstrap_host = service_fqdn( + &kafka, + &rolegroup_service_name(&rolegroup_ref, KafkaListenerName::Bootstrap), + &cluster_info + ) + .unwrap(), + bootstrap_port = node_port_cmd( + STACKABLE_LISTENER_BOOTSTRAP_DIR, + kafka_security.bootstrap_port_name() + ), ) ); assert_eq!( config.listener_security_protocol_map(), format!( - "{name}:{protocol},{internal_name}:{internal_protocol},{controller_name}:{controller_protocol}", + "{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol},{controller_name}:{controller_protocol}", name = KafkaListenerName::Client, protocol = KafkaListenerProtocol::Ssl, internal_name = KafkaListenerName::Internal, internal_protocol = KafkaListenerProtocol::Ssl, controller_name = KafkaListenerName::Controller, controller_protocol = KafkaListenerProtocol::Ssl, + bootstrap_name = KafkaListenerName::Bootstrap, + bootstrap_protocol = KafkaListenerProtocol::Ssl, ) ); @@ -527,20 +612,23 @@ mod tests { assert_eq!( config.listeners(), format!( - "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}", name = KafkaListenerName::Client, host = LISTENER_LOCAL_ADDRESS, port = kafka_security.client_port(), internal_name = KafkaListenerName::Internal, internal_host = LISTENER_LOCAL_ADDRESS, internal_port = kafka_security.internal_port(), + bootstrap_name = KafkaListenerName::Bootstrap, + bootstrap_host = LISTENER_LOCAL_ADDRESS, + bootstrap_port = kafka_security.bootstrap_port(), ) ); assert_eq!( config.advertised_listeners(), format!( - "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}", name = KafkaListenerName::Client, host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), port = node_port_cmd( @@ -555,19 +643,32 @@ mod tests { ) .unwrap(), internal_port = kafka_security.internal_port(), + bootstrap_name = KafkaListenerName::Bootstrap, + bootstrap_host = service_fqdn( + &kafka, + &rolegroup_service_name(&rolegroup_ref, KafkaListenerName::Bootstrap), + &cluster_info + ) + .unwrap(), + bootstrap_port = node_port_cmd( + STACKABLE_LISTENER_BOOTSTRAP_DIR, + kafka_security.bootstrap_port_name() + ), ) ); assert_eq!( config.listener_security_protocol_map(), format!( - "{name}:{protocol},{internal_name}:{internal_protocol},{controller_name}:{controller_protocol}", + "{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol},{controller_name}:{controller_protocol}", name = KafkaListenerName::Client, protocol = KafkaListenerProtocol::Plaintext, internal_name = KafkaListenerName::Internal, internal_protocol = KafkaListenerProtocol::Plaintext, controller_name = KafkaListenerName::Controller, controller_protocol = KafkaListenerProtocol::Plaintext, + bootstrap_name = KafkaListenerName::Bootstrap, + bootstrap_protocol = KafkaListenerProtocol::Plaintext, ) ); } @@ -648,10 +749,15 @@ mod tests { .unwrap(), internal_port = kafka_security.internal_port(), bootstrap_name = KafkaListenerName::Bootstrap, - bootstrap_host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR), + bootstrap_host = service_fqdn( + &kafka, + &rolegroup_service_name(&rolegroup_ref, KafkaListenerName::Bootstrap), + &cluster_info + ) + .unwrap(), bootstrap_port = node_port_cmd( - STACKABLE_LISTENER_BROKER_DIR, - kafka_security.client_port_name() + STACKABLE_LISTENER_BOOTSTRAP_DIR, + kafka_security.bootstrap_port_name() ), ) ); diff --git a/rust/operator-binary/src/crd/role/controller.rs b/rust/operator-binary/src/crd/role/controller.rs index 5b9513a5..4b4664be 100644 --- a/rust/operator-binary/src/crd/role/controller.rs +++ b/rust/operator-binary/src/crd/role/controller.rs @@ -64,6 +64,9 @@ pub struct ControllerConfig { #[fragment_attrs(serde(default))] pub resources: Resources, + + /// The ListenerClass used for bootstrapping new clients. + pub bootstrap_listener_class: String, } impl ControllerConfig { @@ -88,6 +91,7 @@ impl ControllerConfig { }, }, }, + bootstrap_listener_class: Some("cluster-internal".to_string()), } } } diff --git a/rust/operator-binary/src/crd/role/mod.rs b/rust/operator-binary/src/crd/role/mod.rs index 47210ea4..5099da6c 100644 --- a/rust/operator-binary/src/crd/role/mod.rs +++ b/rust/operator-binary/src/crd/role/mod.rs @@ -439,6 +439,13 @@ impl AnyConfig { } } + pub fn bootstrap_listener_class(&self) -> &String { + match self { + AnyConfig::Broker(broker_config) => &broker_config.bootstrap_listener_class, + AnyConfig::Controller(controller_config) => &controller_config.bootstrap_listener_class, + } + } + pub fn config_file_name(&self) -> &str { match self { AnyConfig::Broker(_) => BROKER_PROPERTIES_FILE, diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index b729386a..1cff64aa 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -431,6 +431,7 @@ impl KafkaTlsSecurity { /// Adds required volumes and volume mounts to the broker pod and container builders /// depending on the tls and authentication settings. + /// TODO: there is a lot of code duplication with the controller version of this function. pub fn add_broker_volume_and_volume_mounts( &self, pod_builder: &mut PodBuilder, @@ -491,6 +492,7 @@ impl KafkaTlsSecurity { /// Adds required volumes and volume mounts to the controller pod and container builders /// depending on the tls and authentication settings. + /// TODO: there is a lot of code duplication with the broker version of this function. pub fn add_controller_volume_and_volume_mounts( &self, pod_builder: &mut PodBuilder, @@ -520,6 +522,29 @@ impl KafkaTlsSecurity { .context(AddVolumeMountSnafu)?; } + if let Some(tls_secret_secret_class) = self.tls_server_secret_class() { + pod_builder + .add_volume( + VolumeBuilder::new(Self::STACKABLE_TLS_KAFKA_SERVER_VOLUME_NAME) + .ephemeral( + SecretOperatorVolumeSourceBuilder::new(tls_secret_secret_class) + .with_pod_scope() + .with_format(SecretFormat::TlsPkcs12) + .with_auto_tls_cert_lifetime(*requested_secret_lifetime) + .build() + .context(SecretVolumeBuildSnafu)?, + ) + .build(), + ) + .context(AddVolumeSnafu)?; + cb_kafka + .add_volume_mount( + Self::STACKABLE_TLS_KAFKA_SERVER_VOLUME_NAME, + Self::STACKABLE_TLS_KAFKA_SERVER_DIR, + ) + .context(AddVolumeMountSnafu)?; + } + Ok(()) } @@ -567,32 +592,33 @@ impl KafkaTlsSecurity { } } + // Bootstrap + config.insert( + KafkaListenerName::Bootstrap.listener_ssl_keystore_location(), + format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), + ); + config.insert( + KafkaListenerName::Bootstrap.listener_ssl_keystore_password(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + KafkaListenerName::Bootstrap.listener_ssl_keystore_type(), + "PKCS12".to_string(), + ); + config.insert( + KafkaListenerName::Bootstrap.listener_ssl_truststore_location(), + format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), + ); + config.insert( + KafkaListenerName::Bootstrap.listener_ssl_truststore_password(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + KafkaListenerName::Bootstrap.listener_ssl_truststore_type(), + "PKCS12".to_string(), + ); + if self.has_kerberos_enabled() { - // Bootstrap - config.insert( - KafkaListenerName::Bootstrap.listener_ssl_keystore_location(), - format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), - ); - config.insert( - KafkaListenerName::Bootstrap.listener_ssl_keystore_password(), - Self::SSL_STORE_PASSWORD.to_string(), - ); - config.insert( - KafkaListenerName::Bootstrap.listener_ssl_keystore_type(), - "PKCS12".to_string(), - ); - config.insert( - KafkaListenerName::Bootstrap.listener_ssl_truststore_location(), - format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), - ); - config.insert( - KafkaListenerName::Bootstrap.listener_ssl_truststore_password(), - Self::SSL_STORE_PASSWORD.to_string(), - ); - config.insert( - KafkaListenerName::Bootstrap.listener_ssl_truststore_type(), - "PKCS12".to_string(), - ); config.insert("sasl.enabled.mechanisms".to_string(), "GSSAPI".to_string()); config.insert( "sasl.kerberos.service.name".to_string(), @@ -602,7 +628,6 @@ impl KafkaTlsSecurity { "sasl.mechanism.inter.broker.protocol".to_string(), "GSSAPI".to_string(), ); - tracing::debug!("Kerberos configs added: [{:#?}]", config); } // Internal TLS @@ -717,6 +742,32 @@ impl KafkaTlsSecurity { } } + // Bootstrap + config.insert( + KafkaListenerName::Bootstrap.listener_ssl_keystore_location(), + format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), + ); + config.insert( + KafkaListenerName::Bootstrap.listener_ssl_keystore_password(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + KafkaListenerName::Bootstrap.listener_ssl_keystore_type(), + "PKCS12".to_string(), + ); + config.insert( + KafkaListenerName::Bootstrap.listener_ssl_truststore_location(), + format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_SERVER_DIR), + ); + config.insert( + KafkaListenerName::Bootstrap.listener_ssl_truststore_password(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + KafkaListenerName::Bootstrap.listener_ssl_truststore_type(), + "PKCS12".to_string(), + ); + // Kerberos if self.has_kerberos_enabled() { config.insert("sasl.enabled.mechanisms".to_string(), "GSSAPI".to_string()); @@ -728,7 +779,6 @@ impl KafkaTlsSecurity { "sasl.mechanism.inter.broker.protocol".to_string(), "GSSAPI".to_string(), ); - tracing::debug!("Kerberos configs added: [{:#?}]", config); } config diff --git a/rust/operator-binary/src/discovery.rs b/rust/operator-binary/src/discovery.rs index ca76eafd..e2f50969 100644 --- a/rust/operator-binary/src/discovery.rs +++ b/rust/operator-binary/src/discovery.rs @@ -52,19 +52,15 @@ pub fn build_discovery_configmap( kafka_security: &KafkaTlsSecurity, listeners: &[listener::v1alpha1::Listener], ) -> Result { - let port_name = if kafka_security.has_kerberos_enabled() { - kafka_security.bootstrap_port_name() - } else { - kafka_security.client_port_name() - }; - - // Write a list of bootstrap servers in the format that Kafka clients: - // "{host1}:{port1},{host2:port2},..." - let bootstrap_servers = listener_hosts(listeners, port_name)? - .into_iter() - .map(|(host, port)| format!("{}:{}", host, port)) - .collect::>() - .join(","); + // Write a list of *broker* bootstrap hosts for all rolegroups separated by commas: + // "{host1}:{port},{host2:port},..." + // The port is the same bootstrap port for all services. + let bootstrap_servers = + listener_hosts_filtered_by_port_name(listeners, kafka_security.bootstrap_port_name())? + .into_iter() + .map(|(host, port)| format!("{}:{}", host, port)) + .collect::>() + .join(","); ConfigMapBuilder::new() .metadata( ObjectMetaBuilder::new() @@ -89,7 +85,7 @@ pub fn build_discovery_configmap( .context(BuildConfigMapSnafu) } -fn listener_hosts( +fn listener_hosts_filtered_by_port_name( listeners: &[listener::v1alpha1::Listener], port_name: &str, ) -> Result, Error> { diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index aebe52c3..8bebb179 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -49,7 +49,7 @@ use crate::{ operations::pdb::add_pdbs, resource::{ configmap::build_rolegroup_config_map, - listener::build_broker_rolegroup_bootstrap_listener, + listener::build_rolegroup_bootstrap_listener, service::{build_rolegroup_headless_service, build_rolegroup_metrics_service}, statefulset::{build_broker_rolegroup_statefulset, build_controller_rolegroup_statefulset}, }, @@ -426,21 +426,24 @@ pub async fn reconcile_kafka( .context(BuildStatefulsetSnafu)?, }; - if let AnyConfig::Broker(broker_config) = merged_config { - let rg_bootstrap_listener = build_broker_rolegroup_bootstrap_listener( - kafka, - &resolved_product_image, - &kafka_security, - &rolegroup_ref, - &broker_config, - ) - .context(BuildListenerSnafu)?; - bootstrap_listeners.push( - cluster_resources - .add(client, rg_bootstrap_listener) - .await - .context(ApplyRoleServiceSnafu)?, - ); + let rg_bootstrap_listener = build_rolegroup_bootstrap_listener( + kafka, + &resolved_product_image, + &kafka_security, + &rolegroup_ref, + &merged_config, + ) + .context(BuildListenerSnafu)?; + + let rg_bootstrap_listener = cluster_resources + .add(client, rg_bootstrap_listener) + .await + .context(ApplyRoleServiceSnafu)?; + + // TODO: for backwards compatibility we only add the broker bootstrap listener + // to this list in order to not break the discovery configmap even more than it already is. + if let AnyConfig::Broker(_) = &merged_config { + bootstrap_listeners.push(rg_bootstrap_listener); } cluster_resources diff --git a/rust/operator-binary/src/resource/listener.rs b/rust/operator-binary/src/resource/listener.rs index 3a9b19fa..340f0b23 100644 --- a/rust/operator-binary/src/resource/listener.rs +++ b/rust/operator-binary/src/resource/listener.rs @@ -5,7 +5,7 @@ use stackable_operator::{ }; use crate::{ - crd::{role::broker::BrokerConfig, security::KafkaTlsSecurity, v1alpha1}, + crd::{role::AnyConfig, security::KafkaTlsSecurity, v1alpha1}, kafka_controller::KAFKA_CONTROLLER_NAME, utils::build_recommended_labels, }; @@ -25,13 +25,12 @@ pub enum Error { /// Kafka clients will use the load-balanced bootstrap listener to get a list of broker addresses and will use those to /// transmit data to the correct broker. -// TODO (@NickLarsenNZ): Move shared functionality to stackable-operator -pub fn build_broker_rolegroup_bootstrap_listener( +pub fn build_rolegroup_bootstrap_listener( kafka: &v1alpha1::KafkaCluster, resolved_product_image: &ResolvedProductImage, kafka_security: &KafkaTlsSecurity, rolegroup: &RoleGroupRef, - merged_config: &BrokerConfig, + merged_config: &AnyConfig, ) -> Result { Ok(listener::v1alpha1::Listener { metadata: ObjectMetaBuilder::new() @@ -49,7 +48,7 @@ pub fn build_broker_rolegroup_bootstrap_listener( .context(MetadataBuildSnafu)? .build(), spec: listener::v1alpha1::ListenerSpec { - class_name: Some(merged_config.bootstrap_listener_class.clone()), + class_name: Some(merged_config.bootstrap_listener_class().clone()), ports: Some(bootstrap_listener_ports(kafka_security)), ..listener::v1alpha1::ListenerSpec::default() }, @@ -60,17 +59,9 @@ pub fn build_broker_rolegroup_bootstrap_listener( fn bootstrap_listener_ports( kafka_security: &KafkaTlsSecurity, ) -> Vec { - vec![if kafka_security.has_kerberos_enabled() { - listener::v1alpha1::ListenerPort { - name: kafka_security.bootstrap_port_name().to_string(), - port: kafka_security.bootstrap_port().into(), - protocol: Some("TCP".to_string()), - } - } else { - listener::v1alpha1::ListenerPort { - name: kafka_security.client_port_name().to_string(), - port: kafka_security.client_port().into(), - protocol: Some("TCP".to_string()), - } + vec![listener::v1alpha1::ListenerPort { + name: kafka_security.bootstrap_port_name().to_string(), + port: kafka_security.bootstrap_port().into(), + protocol: Some("TCP".to_string()), }] } diff --git a/rust/operator-binary/src/resource/statefulset.rs b/rust/operator-binary/src/resource/statefulset.rs index 89154dad..6feabca2 100644 --- a/rust/operator-binary/src/resource/statefulset.rs +++ b/rust/operator-binary/src/resource/statefulset.rs @@ -572,6 +572,15 @@ pub fn build_controller_rolegroup_statefulset( &rolegroup_ref.role, &rolegroup_ref.role_group, ); + let unversioned_recommended_labels = Labels::recommended(build_recommended_labels( + kafka, + KAFKA_CONTROLLER_NAME, + // A version value is required, and we do want to use the "recommended" format for the other desired labels + "none", + &rolegroup_ref.role, + &rolegroup_ref.role_group, + )) + .context(LabelBuildSnafu)?; let kafka_container_name = ControllerContainer::Kafka.to_string(); let mut cb_kafka = @@ -807,6 +816,19 @@ pub fn build_controller_rolegroup_statefulset( add_graceful_shutdown_config(merged_config, &mut pod_builder).context(GracefulShutdownSnafu)?; + let mut pvcs = merged_config.resources().storage.build_pvcs(); + + // bootstrap listener should be persistent, + // main broker listener is an ephemeral PVC instead + pvcs.push( + ListenerOperatorVolumeSourceBuilder::new( + &ListenerReference::ListenerName(kafka.bootstrap_service_name(rolegroup_ref)), + &unversioned_recommended_labels, + ) + .build_pvc(LISTENER_BOOTSTRAP_VOLUME_NAME) + .context(BuildBootstrapListenerPvcSnafu)?, + ); + let mut pod_template = pod_builder.build_template(); pod_template.merge_from( @@ -860,7 +882,7 @@ pub fn build_controller_rolegroup_statefulset( }, service_name: Some(rolegroup_ref.rolegroup_headless_service_name()), template: pod_template, - volume_claim_templates: Some(merged_config.resources().storage.build_pvcs()), + volume_claim_templates: Some(pvcs), ..StatefulSetSpec::default() }), status: None, diff --git a/tests/templates/kuttl/kerberos-kraft/00-assert.yaml.j2 b/tests/templates/kuttl/kerberos-kraft/00-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/kerberos-kraft/00-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/kerberos-kraft/00-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/kerberos-kraft/00-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/kerberos-kraft/00-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/kerberos-kraft/00-patch-ns.yaml.j2 b/tests/templates/kuttl/kerberos-kraft/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/kerberos-kraft/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/kerberos-kraft/00-rbac.yaml.j2 b/tests/templates/kuttl/kerberos-kraft/00-rbac.yaml.j2 new file mode 100644 index 00000000..7ee61d23 --- /dev/null +++ b/tests/templates/kuttl/kerberos-kraft/00-rbac.yaml.j2 @@ -0,0 +1,29 @@ +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: test-role +rules: +{% if test_scenario['values']['openshift'] == "true" %} + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + resourceNames: ["privileged"] + verbs: ["use"] +{% endif %} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: test-sa +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: test-rb +subjects: + - kind: ServiceAccount + name: test-sa +roleRef: + kind: Role + name: test-role + apiGroup: rbac.authorization.k8s.io diff --git a/tests/templates/kuttl/kerberos-kraft/01-assert.yaml.j2 b/tests/templates/kuttl/kerberos-kraft/01-assert.yaml.j2 new file mode 100644 index 00000000..d34c1c63 --- /dev/null +++ b/tests/templates/kuttl/kerberos-kraft/01-assert.yaml.j2 @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +{% if test_scenario['values']['kerberos-backend'] == 'mit' %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: krb5-kdc +status: + readyReplicas: 1 + replicas: 1 +{% endif %} diff --git a/tests/templates/kuttl/kerberos-kraft/01-install-krb5-kdc.yaml.j2 b/tests/templates/kuttl/kerberos-kraft/01-install-krb5-kdc.yaml.j2 new file mode 100644 index 00000000..69ceec81 --- /dev/null +++ b/tests/templates/kuttl/kerberos-kraft/01-install-krb5-kdc.yaml.j2 @@ -0,0 +1,146 @@ +{% if test_scenario['values']['kerberos-backend'] == 'mit' %} +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: krb5-kdc +spec: + selector: + matchLabels: + app: krb5-kdc + template: + metadata: + labels: + app: krb5-kdc + spec: + serviceAccountName: test-sa + initContainers: + - name: init + image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + args: + - sh + - -euo + - pipefail + - -c + - | + test -e /var/kerberos/krb5kdc/principal || kdb5_util create -s -P asdf + kadmin.local get_principal -terse root/admin || kadmin.local add_principal -pw asdf root/admin + # stackable-secret-operator principal must match the keytab specified in the SecretClass + kadmin.local get_principal -terse stackable-secret-operator || kadmin.local add_principal -e aes256-cts-hmac-sha384-192:normal -pw asdf stackable-secret-operator + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + - mountPath: /var/kerberos/krb5kdc + name: data + containers: + - name: kdc + image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + args: + - krb5kdc + - -n + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + - mountPath: /var/kerberos/krb5kdc + name: data +# Root permissions required on Openshift to bind to privileged port numbers +{% if test_scenario['values']['openshift'] == "true" %} + securityContext: + runAsUser: 0 +{% endif %} + - name: kadmind + image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + args: + - kadmind + - -nofork + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + - mountPath: /var/kerberos/krb5kdc + name: data +# Root permissions required on Openshift to bind to privileged port numbers +{% if test_scenario['values']['openshift'] == "true" %} + securityContext: + runAsUser: 0 +{% endif %} + - name: client + image: oci.stackable.tech/sdp/krb5:{{ test_scenario['values']['krb5'] }}-stackable0.0.0-dev + tty: true + stdin: true + env: + - name: KRB5_CONFIG + value: /stackable/config/krb5.conf + volumeMounts: + - mountPath: /stackable/config + name: config + volumes: + - name: config + configMap: + name: krb5-kdc + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: krb5-kdc +spec: + selector: + app: krb5-kdc + ports: + - name: kadmin + port: 749 + - name: kdc + port: 88 + - name: kdc-udp + port: 88 + protocol: UDP +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: krb5-kdc +data: + krb5.conf: | + [logging] + default = STDERR + kdc = STDERR + admin_server = STDERR + # default = FILE:/var/log/krb5libs.log + # kdc = FILE:/var/log/krb5kdc.log + # admin_server = FILE:/vaggr/log/kadmind.log + [libdefaults] + dns_lookup_realm = false + ticket_lifetime = 24h + renew_lifetime = 7d + forwardable = true + rdns = false + default_realm = {{ test_scenario['values']['kerberos-realm'] }} + spake_preauth_groups = edwards25519 + [realms] + {{ test_scenario['values']['kerberos-realm'] }} = { + acl_file = /stackable/config/kadm5.acl + disable_encrypted_timestamp = false + } + [domain_realm] + .cluster.local = {{ test_scenario['values']['kerberos-realm'] }} + cluster.local = {{ test_scenario['values']['kerberos-realm'] }} + kadm5.acl: | + root/admin *e + stackable-secret-operator *e +{% endif %} diff --git a/tests/templates/kuttl/kerberos-kraft/02-create-kerberos-secretclass.yaml.j2 b/tests/templates/kuttl/kerberos-kraft/02-create-kerberos-secretclass.yaml.j2 new file mode 100644 index 00000000..04ae9a63 --- /dev/null +++ b/tests/templates/kuttl/kerberos-kraft/02-create-kerberos-secretclass.yaml.j2 @@ -0,0 +1,72 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: | + kubectl apply -n $NAMESPACE -f - < 0 %} + custom: "{{ test_scenario['values']['kafka-kraft'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['kafka-kraft'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: + authentication: + - authenticationClass: kerberos-auth-$NAMESPACE + tls: + # Kerberos requires the use of server and internal TLS! + serverSecretClass: tls +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + brokers: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + brokerListenerClass: {{ test_scenario['values']['broker-listener-class'] }} + bootstrapListenerClass: {{ test_scenario['values']['bootstrap-listener-class'] }} + gracefulShutdownTimeout: 30s # speed up tests + roleGroups: + default: + replicas: 3 + controllers: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + gracefulShutdownTimeout: 30s # speed up tests + roleGroups: + default: + replicas: 3 + + EOF diff --git a/tests/templates/kuttl/kerberos-kraft/30-access-kafka.txt.j2 b/tests/templates/kuttl/kerberos-kraft/30-access-kafka.txt.j2 new file mode 100644 index 00000000..0b8145bf --- /dev/null +++ b/tests/templates/kuttl/kerberos-kraft/30-access-kafka.txt.j2 @@ -0,0 +1,105 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: access-kafka +spec: + template: + spec: + serviceAccountName: test-sa + containers: + - name: access-kafka + image: oci.stackable.tech/sdp/kafka:{{ test_scenario['values']['kafka-kraft'] }}-stackable0.0.0-dev + command: + - /bin/bash + - /tmp/script/script.sh + env: + - name: KAFKA_OPTS + value: -Djava.security.krb5.conf=/stackable/kerberos/krb5.conf + - name: KAFKA_HEAP_OPTS + value: -Xmx1638m -Xms1638m + resources: + limits: + cpu: "1" + memory: 2Gi + requests: + cpu: 250m + memory: 2Gi + volumeMounts: + - name: script + mountPath: /tmp/script + - mountPath: /stackable/tls-ca-cert-mount + name: tls-ca-cert-mount + - name: kerberos + mountPath: /stackable/kerberos + volumes: + - name: script + configMap: + name: access-kafka-script + - name: kerberos + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: kerberos-$NAMESPACE + secrets.stackable.tech/scope: service=access-kafka + secrets.stackable.tech/kerberos.service.names: developer + spec: + storageClassName: secrets.stackable.tech + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "1" + - name: tls-ca-cert-mount + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: tls + secrets.stackable.tech/scope: pod + secrets.stackable.tech/format: tls-pkcs12 + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "1" + storageClassName: secrets.stackable.tech + volumeMode: Filesystem + securityContext: + fsGroup: 1000 + restartPolicy: OnFailure +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: access-kafka-script +data: + script.sh: | + set -euxo pipefail + + CLIENT_PROPERTIES=/stackable/client.properties + + echo -e -n "\ + security.protocol=SASL_SSL + ssl.keystore.location=/stackable/tls-ca-cert-mount/keystore.p12 + ssl.keystore.password= + ssl.keystore.type=PKCS12 + ssl.truststore.location=/stackable/tls-ca-cert-mount/truststore.p12 + ssl.truststore.password= + ssl.truststore.type=PKCS12 + sasl.enabled.mechanisms=GSSAPI + sasl.kerberos.service.name=kafka + sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule\ required\ useKeyTab\=true\ storeKey\=true\ keyTab\=\"/stackable/kerberos/keytab\"\ principal\=\"developer/access-kafka.$NAMESPACE.svc.cluster.local@{{ test_scenario['values']['kerberos-realm'] }}\"; + " > "$CLIENT_PROPERTIES" + + # Listing topics alone is a test that we can connect with kerberos. + # ISSUES: + # - cannot use the KAFKA address from the discovery config map for bootstrapping since it can be just an IP address + # which the certificate does not cover. + # - cannot use the bootstrap service either (`kafka--broker-default-bootstrap`) even after adding it to the + # `tls-ca-cert-mount` scope because it's not supported with `cluster-external` classes. + /stackable/kafka/bin/kafka-topics.sh --list \ + --bootstrap-server kafka-broker-default-0-listener-broker.$NAMESPACE.svc.cluster.local:9093 \ + --command-config "$CLIENT_PROPERTIES" diff --git a/tests/templates/kuttl/kerberos-kraft/30-access-kafka.yaml b/tests/templates/kuttl/kerberos-kraft/30-access-kafka.yaml new file mode 100644 index 00000000..eecc0f08 --- /dev/null +++ b/tests/templates/kuttl/kerberos-kraft/30-access-kafka.yaml @@ -0,0 +1,6 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + # We need to replace $NAMESPACE (by KUTTL) + - script: envsubst '$NAMESPACE' < 30-access-kafka.txt | kubectl apply -n $NAMESPACE -f - diff --git a/tests/templates/kuttl/kerberos-kraft/30-assert.yaml b/tests/templates/kuttl/kerberos-kraft/30-assert.yaml new file mode 100644 index 00000000..edc6c317 --- /dev/null +++ b/tests/templates/kuttl/kerberos-kraft/30-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: access-kafka +status: + succeeded: 1 diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index f31f68a2..1977b47b 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -6,12 +6,12 @@ dimensions: - name: kafka-kraft values: - - 3.7.2 - - 3.9.1 + # - 3.7.2 + # - 3.9.1 - 4.1.0 - name: kafka values: - - 3.7.2 + # - 3.7.2 - 3.9.1 # Alternatively, if you want to use a custom image, append a comma and the full image name to the product version # as in the example below. @@ -24,7 +24,7 @@ dimensions: # - 3.9.1,oci.stackable.tech/sdp/kafka:3.9.1-stackable0.0.0-dev - name: zookeeper values: - - 3.9.3 + # - 3.9.3 - 3.9.4 - name: zookeeper-latest values: @@ -51,7 +51,7 @@ dimensions: - 1.21.1 - name: kerberos-realm values: - - "CLUSTER.LOCAL" + # - "CLUSTER.LOCAL" - "PROD.MYCORP" - name: kerberos-backend values: @@ -64,14 +64,14 @@ dimensions: - "cluster-internal" - name: bootstrap-listener-class values: - - "cluster-internal" - - "external-stable" + # - "cluster-internal" + # - "external-stable" - "external-unstable" tests: - - name: operations-kraft - dimensions: - - kafka-kraft - - openshift + # - name: operations-kraft + # dimensions: + # - kafka-kraft + # - openshift - name: smoke-kraft dimensions: - kafka-kraft @@ -82,18 +82,18 @@ tests: - zookeeper - use-client-tls - openshift - - name: configuration - dimensions: - - kafka-latest - - openshift - - name: upgrade - dimensions: - - zookeeper - - upgrade_new - - upgrade_old - - use-client-tls - - use-client-auth-tls - - openshift + # - name: configuration + # dimensions: + # - kafka-latest + # - openshift + # - name: upgrade + # dimensions: + # - zookeeper + # - upgrade_new + # - upgrade_old + # - use-client-tls + # - use-client-auth-tls + # - openshift - name: tls dimensions: - kafka @@ -101,32 +101,40 @@ tests: - use-client-tls - use-client-auth-tls - openshift - - name: delete-rolegroup - dimensions: - - kafka - - zookeeper-latest - - openshift - - name: logging + # - name: delete-rolegroup + # dimensions: + # - kafka + # - zookeeper-latest + # - openshift + # - name: logging + # dimensions: + # - kafka + # - zookeeper-latest + # - openshift + # - name: cluster-operation + # dimensions: + # - zookeeper-latest + # - kafka-latest + # - openshift + - name: kerberos dimensions: - kafka - zookeeper-latest + - krb5 + - kerberos-realm + - kerberos-backend - openshift - - name: cluster-operation - dimensions: - - zookeeper-latest - - kafka-latest - - openshift - - name: kerberos + - broker-listener-class + - bootstrap-listener-class + - name: kerberos-kraft dimensions: - - kafka - - zookeeper-latest + - kafka-kraft - krb5 - kerberos-realm - kerberos-backend - openshift - broker-listener-class - bootstrap-listener-class - suites: - name: nightly patch: