diff --git a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java index 35b5b7cb11c6..9bd5f0e00b12 100644 --- a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java +++ b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java @@ -155,64 +155,28 @@ public void shutdown() { @Override public void setState(final Map state, final String componentId) throws IOException { try { - final ConfigMap configMap = createConfigMapBuilder(state, componentId).build(); - Resource configMapResource = kubernetesClient.configMaps().resource(configMap); - final String configMapName = configMap.getMetadata().getName(); - - ConfigMap configMapCreated = null; + final ConfigMap configMap = createConfigMapBuilder(state, componentId, null).build(); + ConfigMap configMapResult = null; - // Attempt to create or update, up to 3 times. We expect that we will update more frequently than create - // so we first attempt to update. If we get back a 404, then we create it. - boolean create = false; + // Attempt to create or update, up to 3 times for (int attempt = 0; attempt < MAX_UPDATE_ATTEMPTS; attempt++) { try { - if (create) { - configMapCreated = configMapResource.create(); + final ConfigMap existingConfigMap = kubernetesClient.configMaps().resource(configMap).get(); + if (existingConfigMap == null) { + configMapResult = kubernetesClient.configMaps().resource(configMap).create(); } else { - configMapCreated = configMapResource.update(); + existingConfigMap.setData(configMap.getData()); + configMapResult = kubernetesClient.configMaps().resource(existingConfigMap).update(); } - break; } catch (final KubernetesClientException e) { final int returnCode = e.getCode(); - if (returnCode == HttpURLConnection.HTTP_NOT_FOUND) { - // A 404 return code indicates that we need to create the resource instead of update it. - // Now, we will attempt to create the resource instead of update it, so we'll reset the attempt counter. - attempt = 0; - create = true; - continue; - } - - if (returnCode == HttpURLConnection.HTTP_CONFLICT) { - logger.debug("Update conflict detected when setting state for Component ID [{}]. Attempt {} of {}.", componentId, attempt + 1, MAX_UPDATE_ATTEMPTS); - - if (attempt < MAX_UPDATE_ATTEMPTS - 1) { - final ConfigMap latestConfigMap = kubernetesClient.configMaps() - .inNamespace(namespace) - .withName(configMapName) - .get(); - - if (latestConfigMap != null) { - final ObjectMeta latestMetadata = latestConfigMap.getMetadata(); - final String latestResourceVersion = latestMetadata != null ? latestMetadata.getResourceVersion() : null; - - if (latestResourceVersion != null) { - configMap.getMetadata().setResourceVersion(latestResourceVersion); - configMapResource = kubernetesClient.configMaps().resource(configMap); - logger.debug("Retrying state update for Component ID [{}] with resource version [{}]", componentId, latestResourceVersion); - continue; - } - } - } - - throw e; - } - - if (returnCode >= 500) { - // Server-side error. We should retry, up to some number of attempts. + if (returnCode == HttpURLConnection.HTTP_CONFLICT || returnCode >= HttpURLConnection.HTTP_INTERNAL_ERROR) { + // Conflict or Server-side error. We should retry, up to some number of attempts. if (attempt == MAX_UPDATE_ATTEMPTS - 1) { throw e; } + logger.warn("Failed to update state for Component ID [{}] on attempt {} of {}", componentId, attempt + 1, MAX_UPDATE_ATTEMPTS, e); } else { // There's an issue with the request. Throw the Exception. throw e; @@ -227,11 +191,11 @@ public void setState(final Map state, final String componentId) } } - if (configMapCreated == null) { + if (configMapResult == null) { throw new IOException("Exhausted maximum number of attempts (%s) to update state for component with ID %s but could not update it".formatted(MAX_UPDATE_ATTEMPTS, componentId)); } - final Optional version = getVersion(configMapCreated); + final Optional version = getVersion(configMapResult); logger.debug("Set State Component ID [{}] Version [{}]", componentId, version); } catch (final KubernetesClientException e) { if (isNotFound(e.getCode())) { @@ -257,7 +221,8 @@ public StateMap getState(final String componentId) throws IOException { final ConfigMap configMap = configMapResource(componentId).get(); final Map data = configMap == null ? Collections.emptyMap() : getDecodedMap(configMap.getData()); final Optional version = configMap == null ? Optional.empty() : getVersion(configMap); - return new StandardStateMap(data, version); + final Optional configMapMetadata = configMap == null ? Optional.empty() : Optional.of(configMap.getMetadata()); + return new StandardStateMap(data, version, configMapMetadata); } catch (final RuntimeException e) { throw new IOException(String.format("Get failed for Component ID [%s]", componentId), e); } @@ -273,12 +238,19 @@ public StateMap getState(final String componentId) throws IOException { */ @Override public boolean replace(final StateMap currentState, final Map state, final String componentId) throws IOException { - final ConfigMapBuilder configMapBuilder = createConfigMapBuilder(state, componentId); - final Optional stateVersion = currentState.getStateVersion(); - if (stateVersion.isPresent()) { - final String resourceVersion = stateVersion.get(); - configMapBuilder.editOrNewMetadata().withResourceVersion(resourceVersion).endMetadata(); + if (currentState instanceof StandardStateMap standardStateMap) { + return replace(standardStateMap, state, componentId); + } else { + throw new IllegalStateException("Current state is not an instance of StandardStateMap"); } + } + + private boolean replace(final StandardStateMap currentState, final Map state, final String componentId) throws IOException { + final Optional existingMetadata = currentState.getConfigMapMetadata(); + final ConfigMapBuilder configMapBuilder = createConfigMapBuilder(state, componentId, existingMetadata.orElse(null)); + + final Optional stateVersion = currentState.getStateVersion(); + stateVersion.ifPresent(resourceVersion -> configMapBuilder.editOrNewMetadata().withResourceVersion(resourceVersion).endMetadata()); final ConfigMap configMap = configMapBuilder.build(); try { @@ -412,15 +384,25 @@ private Resource configMapResource(final String componentId) { return kubernetesClient.configMaps().inNamespace(namespace).withName(name); } - private ConfigMapBuilder createConfigMapBuilder(final Map state, final String componentId) { - final Map encodedData = getEncodedMap(state); + private ConfigMapBuilder createConfigMapBuilder(final Map state, final String componentId, final ObjectMeta existingMetadata) { final String name = getConfigMapName(componentId); - return new ConfigMapBuilder() + + final ConfigMapBuilder configMapBuilder; + if (existingMetadata == null) { + configMapBuilder = new ConfigMapBuilder() .withNewMetadata() .withNamespace(namespace) .withName(name) - .endMetadata() - .withData(encodedData); + .endMetadata(); + } else if (namespace.equals(existingMetadata.getNamespace()) && name.equals(existingMetadata.getName())) { + configMapBuilder = new ConfigMapBuilder().withMetadata(existingMetadata); + } else { + throw new IllegalArgumentException("ConfigMap metadata with namespace [%s] and name [%s], did not match expected namespace [%s] and name [%s]" + .formatted(existingMetadata.getNamespace(), existingMetadata.getName(), namespace, name)); + } + + final Map encodedData = getEncodedMap(state); + return configMapBuilder.withData(encodedData); } private String getConfigMapName(final String componentId) { diff --git a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java index e81fc81975bd..440edc1dabc1 100644 --- a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java +++ b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/StandardStateMap.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.kubernetes.state.provider; +import io.fabric8.kubernetes.api.model.ObjectMeta; import org.apache.nifi.components.state.StateMap; import java.util.Collections; @@ -30,9 +31,12 @@ class StandardStateMap implements StateMap { private final Optional version; - StandardStateMap(final Map data, final Optional version) { + private final Optional configMapMetadata; + + StandardStateMap(final Map data, final Optional version, final Optional configMapMetadata) { this.data = Collections.unmodifiableMap(data == null ? Collections.emptyMap() : data); this.version = version; + this.configMapMetadata = configMapMetadata; } @@ -66,4 +70,8 @@ public String get(final String key) { public Map toMap() { return data; } + + public Optional getConfigMapMetadata() { + return configMapMetadata; + } } diff --git a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java index 82e9a78b6391..7ef21176820f 100644 --- a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java +++ b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java @@ -23,7 +23,6 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.MixedOperation; -import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; @@ -61,7 +60,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -182,13 +180,10 @@ void testSetStateGetState() throws IOException { void testSetStateConflict() { final KubernetesClient mockClient = mock(KubernetesClient.class); final MixedOperation> mockConfigMaps = mock(MixedOperation.class); - final NonNamespaceOperation> mockNamespacedConfigMaps = mock(NonNamespaceOperation.class); final Resource mockResource = mock(Resource.class); when(mockClient.configMaps()).thenReturn(mockConfigMaps); when(mockConfigMaps.resource(any(ConfigMap.class))).thenReturn(mockResource); - when(mockConfigMaps.inNamespace(DEFAULT_NAMESPACE)).thenReturn(mockNamespacedConfigMaps); - when(mockNamespacedConfigMaps.withName(anyString())).thenReturn(mockResource); final String conflictMessageTemplate = "Operation cannot be fulfilled on configmaps \"nifi-component-%s\": " + "the object has been modified; please apply your changes to the latest version and try again"; @@ -274,7 +269,7 @@ void testReplaceNotFound() throws IOException { setContext(); provider.initialize(context); - final StateMap stateMap = new StandardStateMap(Collections.emptyMap(), Optional.empty()); + final StateMap stateMap = new StandardStateMap(Collections.emptyMap(), Optional.empty(), Optional.empty()); final boolean replaced = provider.replace(stateMap, Collections.emptyMap(), COMPONENT_ID); assertTrue(replaced);