From a0e97077c9cc2ff18d417fb39d1ca285dc837419 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Wed, 26 Nov 2025 12:49:58 -0500 Subject: [PATCH 1/2] NIFI-15243 Improve KubernetesConfigMapStateProvider # Conflicts: # nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java --- .../KubernetesConfigMapStateProvider.java | 95 ++++++++----------- .../state/provider/StandardStateMap.java | 11 ++- .../KubernetesConfigMapStateProviderTest.java | 7 +- 3 files changed, 50 insertions(+), 63 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java index 35b5b7cb11c6..603ff96e7e2a 100644 --- a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java +++ b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java @@ -155,64 +155,28 @@ public void shutdown() { @Override public void setState(final Map state, final String componentId) throws IOException { try { - final ConfigMap configMap = createConfigMapBuilder(state, componentId).build(); - Resource configMapResource = kubernetesClient.configMaps().resource(configMap); - final String configMapName = configMap.getMetadata().getName(); - - ConfigMap configMapCreated = null; + final ConfigMap configMap = createConfigMapBuilder(state, componentId, Optional.empty()).build(); + ConfigMap configMapResult = null; - // Attempt to create or update, up to 3 times. We expect that we will update more frequently than create - // so we first attempt to update. If we get back a 404, then we create it. - boolean create = false; + // Attempt to create or update, up to 3 times for (int attempt = 0; attempt < MAX_UPDATE_ATTEMPTS; attempt++) { try { - if (create) { - configMapCreated = configMapResource.create(); + final ConfigMap existingConfigMap = kubernetesClient.configMaps().resource(configMap).get(); + if (existingConfigMap == null) { + configMapResult = kubernetesClient.configMaps().resource(configMap).create(); } else { - configMapCreated = configMapResource.update(); + existingConfigMap.setData(configMap.getData()); + configMapResult = kubernetesClient.configMaps().resource(existingConfigMap).update(); } - break; } catch (final KubernetesClientException e) { final int returnCode = e.getCode(); - if (returnCode == HttpURLConnection.HTTP_NOT_FOUND) { - // A 404 return code indicates that we need to create the resource instead of update it. - // Now, we will attempt to create the resource instead of update it, so we'll reset the attempt counter. - attempt = 0; - create = true; - continue; - } - - if (returnCode == HttpURLConnection.HTTP_CONFLICT) { - logger.debug("Update conflict detected when setting state for Component ID [{}]. Attempt {} of {}.", componentId, attempt + 1, MAX_UPDATE_ATTEMPTS); - - if (attempt < MAX_UPDATE_ATTEMPTS - 1) { - final ConfigMap latestConfigMap = kubernetesClient.configMaps() - .inNamespace(namespace) - .withName(configMapName) - .get(); - - if (latestConfigMap != null) { - final ObjectMeta latestMetadata = latestConfigMap.getMetadata(); - final String latestResourceVersion = latestMetadata != null ? latestMetadata.getResourceVersion() : null; - - if (latestResourceVersion != null) { - configMap.getMetadata().setResourceVersion(latestResourceVersion); - configMapResource = kubernetesClient.configMaps().resource(configMap); - logger.debug("Retrying state update for Component ID [{}] with resource version [{}]", componentId, latestResourceVersion); - continue; - } - } - } - - throw e; - } - - if (returnCode >= 500) { - // Server-side error. We should retry, up to some number of attempts. + if (returnCode == HttpURLConnection.HTTP_CONFLICT || returnCode >= 500) { + // Conflict or Server-side error. We should retry, up to some number of attempts. if (attempt == MAX_UPDATE_ATTEMPTS - 1) { throw e; } + logger.warn("Failed to update state for Component ID [{}]. Attempt {} of {}.", componentId, attempt + 1, MAX_UPDATE_ATTEMPTS, e); } else { // There's an issue with the request. Throw the Exception. throw e; @@ -227,11 +191,11 @@ public void setState(final Map state, final String componentId) } } - if (configMapCreated == null) { + if (configMapResult == null) { throw new IOException("Exhausted maximum number of attempts (%s) to update state for component with ID %s but could not update it".formatted(MAX_UPDATE_ATTEMPTS, componentId)); } - final Optional version = getVersion(configMapCreated); + final Optional version = getVersion(configMapResult); logger.debug("Set State Component ID [{}] Version [{}]", componentId, version); } catch (final KubernetesClientException e) { if (isNotFound(e.getCode())) { @@ -257,7 +221,8 @@ public StateMap getState(final String componentId) throws IOException { final ConfigMap configMap = configMapResource(componentId).get(); final Map data = configMap == null ? Collections.emptyMap() : getDecodedMap(configMap.getData()); final Optional version = configMap == null ? Optional.empty() : getVersion(configMap); - return new StandardStateMap(data, version); + final Optional configMapMetadata = configMap == null ? Optional.empty() : Optional.of(configMap.getMetadata()); + return new StandardStateMap(data, version, configMapMetadata); } catch (final RuntimeException e) { throw new IOException(String.format("Get failed for Component ID [%s]", componentId), e); } @@ -273,7 +238,12 @@ public StateMap getState(final String componentId) throws IOException { */ @Override public boolean replace(final StateMap currentState, final Map state, final String componentId) throws IOException { - final ConfigMapBuilder configMapBuilder = createConfigMapBuilder(state, componentId); + if (!(currentState instanceof StandardStateMap)) { + throw new IllegalStateException("Current state is not an instance of StandardStateMap"); + } + + final Optional existingMetadata = ((StandardStateMap) currentState).getConfigMapMetadata(); + final ConfigMapBuilder configMapBuilder = createConfigMapBuilder(state, componentId, existingMetadata); final Optional stateVersion = currentState.getStateVersion(); if (stateVersion.isPresent()) { final String resourceVersion = stateVersion.get(); @@ -412,15 +382,28 @@ private Resource configMapResource(final String componentId) { return kubernetesClient.configMaps().inNamespace(namespace).withName(name); } - private ConfigMapBuilder createConfigMapBuilder(final Map state, final String componentId) { - final Map encodedData = getEncodedMap(state); + private ConfigMapBuilder createConfigMapBuilder(final Map state, final String componentId, final Optional existingMetadata) { final String name = getConfigMapName(componentId); - return new ConfigMapBuilder() + + final ConfigMapBuilder configMapBuilder; + if (existingMetadata.isPresent()) { + if (!namespace.equals(existingMetadata.get().getNamespace())) { + throw new IllegalArgumentException("Expected existing ConfigMap namespace [%s], but was [%s]".formatted(namespace, existingMetadata.get().getNamespace())); + } + if (!name.equals(existingMetadata.get().getName())) { + throw new IllegalArgumentException("Expected existing ConfigMap name [%s], but was [%s]".formatted(name, existingMetadata.get().getName())); + } + configMapBuilder = new ConfigMapBuilder().withMetadata(existingMetadata.get()); + } else { + configMapBuilder = new ConfigMapBuilder() .withNewMetadata() .withNamespace(namespace) .withName(name) - .endMetadata() - .withData(encodedData); + .endMetadata(); + } + + final Map encodedData = getEncodedMap(state); + return configMapBuilder.withData(encodedData); } private String getConfigMapName(final String componentId) { diff --git a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java index e81fc81975bd..490e493c0f67 100644 --- a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java +++ b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Optional; +import io.fabric8.kubernetes.api.model.ObjectMeta; + /** * Standard implementation of StateMap */ @@ -30,9 +32,12 @@ class StandardStateMap implements StateMap { private final Optional version; - StandardStateMap(final Map data, final Optional version) { + private final Optional configMapMetadata; + + StandardStateMap(final Map data, final Optional version, final Optional configMapMetadata) { this.data = Collections.unmodifiableMap(data == null ? Collections.emptyMap() : data); this.version = version; + this.configMapMetadata = configMapMetadata; } @@ -66,4 +71,8 @@ public String get(final String key) { public Map toMap() { return data; } + + public Optional getConfigMapMetadata() { + return configMapMetadata; + } } diff --git a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java index 82e9a78b6391..7ef21176820f 100644 --- a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java +++ b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java @@ -23,7 +23,6 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.MixedOperation; -import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; @@ -61,7 +60,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -182,13 +180,10 @@ void testSetStateGetState() throws IOException { void testSetStateConflict() { final KubernetesClient mockClient = mock(KubernetesClient.class); final MixedOperation> mockConfigMaps = mock(MixedOperation.class); - final NonNamespaceOperation> mockNamespacedConfigMaps = mock(NonNamespaceOperation.class); final Resource mockResource = mock(Resource.class); when(mockClient.configMaps()).thenReturn(mockConfigMaps); when(mockConfigMaps.resource(any(ConfigMap.class))).thenReturn(mockResource); - when(mockConfigMaps.inNamespace(DEFAULT_NAMESPACE)).thenReturn(mockNamespacedConfigMaps); - when(mockNamespacedConfigMaps.withName(anyString())).thenReturn(mockResource); final String conflictMessageTemplate = "Operation cannot be fulfilled on configmaps \"nifi-component-%s\": " + "the object has been modified; please apply your changes to the latest version and try again"; @@ -274,7 +269,7 @@ void testReplaceNotFound() throws IOException { setContext(); provider.initialize(context); - final StateMap stateMap = new StandardStateMap(Collections.emptyMap(), Optional.empty()); + final StateMap stateMap = new StandardStateMap(Collections.emptyMap(), Optional.empty(), Optional.empty()); final boolean replaced = provider.replace(stateMap, Collections.emptyMap(), COMPONENT_ID); assertTrue(replaced); From d1f7c4c1ec8e146510923a37c82ddd88f3d13529 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Mon, 22 Dec 2025 15:31:44 -0500 Subject: [PATCH 2/2] Address review feedback --- .../KubernetesConfigMapStateProvider.java | 39 +++++++++---------- .../state/provider/StandardStateMap.java | 3 +- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java index 603ff96e7e2a..9bd5f0e00b12 100644 --- a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java +++ b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java @@ -155,7 +155,7 @@ public void shutdown() { @Override public void setState(final Map state, final String componentId) throws IOException { try { - final ConfigMap configMap = createConfigMapBuilder(state, componentId, Optional.empty()).build(); + final ConfigMap configMap = createConfigMapBuilder(state, componentId, null).build(); ConfigMap configMapResult = null; // Attempt to create or update, up to 3 times @@ -171,12 +171,12 @@ public void setState(final Map state, final String componentId) break; } catch (final KubernetesClientException e) { final int returnCode = e.getCode(); - if (returnCode == HttpURLConnection.HTTP_CONFLICT || returnCode >= 500) { + if (returnCode == HttpURLConnection.HTTP_CONFLICT || returnCode >= HttpURLConnection.HTTP_INTERNAL_ERROR) { // Conflict or Server-side error. We should retry, up to some number of attempts. if (attempt == MAX_UPDATE_ATTEMPTS - 1) { throw e; } - logger.warn("Failed to update state for Component ID [{}]. Attempt {} of {}.", componentId, attempt + 1, MAX_UPDATE_ATTEMPTS, e); + logger.warn("Failed to update state for Component ID [{}] on attempt {} of {}", componentId, attempt + 1, MAX_UPDATE_ATTEMPTS, e); } else { // There's an issue with the request. Throw the Exception. throw e; @@ -238,17 +238,19 @@ public StateMap getState(final String componentId) throws IOException { */ @Override public boolean replace(final StateMap currentState, final Map state, final String componentId) throws IOException { - if (!(currentState instanceof StandardStateMap)) { + if (currentState instanceof StandardStateMap standardStateMap) { + return replace(standardStateMap, state, componentId); + } else { throw new IllegalStateException("Current state is not an instance of StandardStateMap"); } + } + + private boolean replace(final StandardStateMap currentState, final Map state, final String componentId) throws IOException { + final Optional existingMetadata = currentState.getConfigMapMetadata(); + final ConfigMapBuilder configMapBuilder = createConfigMapBuilder(state, componentId, existingMetadata.orElse(null)); - final Optional existingMetadata = ((StandardStateMap) currentState).getConfigMapMetadata(); - final ConfigMapBuilder configMapBuilder = createConfigMapBuilder(state, componentId, existingMetadata); final Optional stateVersion = currentState.getStateVersion(); - if (stateVersion.isPresent()) { - final String resourceVersion = stateVersion.get(); - configMapBuilder.editOrNewMetadata().withResourceVersion(resourceVersion).endMetadata(); - } + stateVersion.ifPresent(resourceVersion -> configMapBuilder.editOrNewMetadata().withResourceVersion(resourceVersion).endMetadata()); final ConfigMap configMap = configMapBuilder.build(); try { @@ -382,24 +384,21 @@ private Resource configMapResource(final String componentId) { return kubernetesClient.configMaps().inNamespace(namespace).withName(name); } - private ConfigMapBuilder createConfigMapBuilder(final Map state, final String componentId, final Optional existingMetadata) { + private ConfigMapBuilder createConfigMapBuilder(final Map state, final String componentId, final ObjectMeta existingMetadata) { final String name = getConfigMapName(componentId); final ConfigMapBuilder configMapBuilder; - if (existingMetadata.isPresent()) { - if (!namespace.equals(existingMetadata.get().getNamespace())) { - throw new IllegalArgumentException("Expected existing ConfigMap namespace [%s], but was [%s]".formatted(namespace, existingMetadata.get().getNamespace())); - } - if (!name.equals(existingMetadata.get().getName())) { - throw new IllegalArgumentException("Expected existing ConfigMap name [%s], but was [%s]".formatted(name, existingMetadata.get().getName())); - } - configMapBuilder = new ConfigMapBuilder().withMetadata(existingMetadata.get()); - } else { + if (existingMetadata == null) { configMapBuilder = new ConfigMapBuilder() .withNewMetadata() .withNamespace(namespace) .withName(name) .endMetadata(); + } else if (namespace.equals(existingMetadata.getNamespace()) && name.equals(existingMetadata.getName())) { + configMapBuilder = new ConfigMapBuilder().withMetadata(existingMetadata); + } else { + throw new IllegalArgumentException("ConfigMap metadata with namespace [%s] and name [%s], did not match expected namespace [%s] and name [%s]" + .formatted(existingMetadata.getNamespace(), existingMetadata.getName(), namespace, name)); } final Map encodedData = getEncodedMap(state); diff --git a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java index 490e493c0f67..440edc1dabc1 100644 --- a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java +++ b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java @@ -16,14 +16,13 @@ */ package org.apache.nifi.kubernetes.state.provider; +import io.fabric8.kubernetes.api.model.ObjectMeta; import org.apache.nifi.components.state.StateMap; import java.util.Collections; import java.util.Map; import java.util.Optional; -import io.fabric8.kubernetes.api.model.ObjectMeta; - /** * Standard implementation of StateMap */