Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,64 +155,28 @@ public void shutdown() {
@Override
public void setState(final Map<String, String> state, final String componentId) throws IOException {
try {
final ConfigMap configMap = createConfigMapBuilder(state, componentId).build();
Resource<ConfigMap> configMapResource = kubernetesClient.configMaps().resource(configMap);
final String configMapName = configMap.getMetadata().getName();

ConfigMap configMapCreated = null;
final ConfigMap configMap = createConfigMapBuilder(state, componentId, null).build();
ConfigMap configMapResult = null;

// Attempt to create or update, up to 3 times. We expect that we will update more frequently than create
// so we first attempt to update. If we get back a 404, then we create it.
boolean create = false;
// Attempt to create or update, up to 3 times
for (int attempt = 0; attempt < MAX_UPDATE_ATTEMPTS; attempt++) {
try {
if (create) {
configMapCreated = configMapResource.create();
final ConfigMap existingConfigMap = kubernetesClient.configMaps().resource(configMap).get();
if (existingConfigMap == null) {
configMapResult = kubernetesClient.configMaps().resource(configMap).create();
} else {
configMapCreated = configMapResource.update();
existingConfigMap.setData(configMap.getData());
configMapResult = kubernetesClient.configMaps().resource(existingConfigMap).update();
}

break;
} catch (final KubernetesClientException e) {
final int returnCode = e.getCode();
if (returnCode == HttpURLConnection.HTTP_NOT_FOUND) {
// A 404 return code indicates that we need to create the resource instead of update it.
// Now, we will attempt to create the resource instead of update it, so we'll reset the attempt counter.
attempt = 0;
create = true;
continue;
}

if (returnCode == HttpURLConnection.HTTP_CONFLICT) {
logger.debug("Update conflict detected when setting state for Component ID [{}]. Attempt {} of {}.", componentId, attempt + 1, MAX_UPDATE_ATTEMPTS);

if (attempt < MAX_UPDATE_ATTEMPTS - 1) {
final ConfigMap latestConfigMap = kubernetesClient.configMaps()
.inNamespace(namespace)
.withName(configMapName)
.get();

if (latestConfigMap != null) {
final ObjectMeta latestMetadata = latestConfigMap.getMetadata();
final String latestResourceVersion = latestMetadata != null ? latestMetadata.getResourceVersion() : null;

if (latestResourceVersion != null) {
configMap.getMetadata().setResourceVersion(latestResourceVersion);
configMapResource = kubernetesClient.configMaps().resource(configMap);
logger.debug("Retrying state update for Component ID [{}] with resource version [{}]", componentId, latestResourceVersion);
continue;
}
}
}

throw e;
}

if (returnCode >= 500) {
// Server-side error. We should retry, up to some number of attempts.
if (returnCode == HttpURLConnection.HTTP_CONFLICT || returnCode >= HttpURLConnection.HTTP_INTERNAL_ERROR) {
// Conflict or Server-side error. We should retry, up to some number of attempts.
if (attempt == MAX_UPDATE_ATTEMPTS - 1) {
throw e;
}
logger.warn("Failed to update state for Component ID [{}] on attempt {} of {}", componentId, attempt + 1, MAX_UPDATE_ATTEMPTS, e);
} else {
// There's an issue with the request. Throw the Exception.
throw e;
Expand All @@ -227,11 +191,11 @@ public void setState(final Map<String, String> state, final String componentId)
}
}

if (configMapCreated == null) {
if (configMapResult == null) {
throw new IOException("Exhausted maximum number of attempts (%s) to update state for component with ID %s but could not update it".formatted(MAX_UPDATE_ATTEMPTS, componentId));
}

final Optional<String> version = getVersion(configMapCreated);
final Optional<String> version = getVersion(configMapResult);
logger.debug("Set State Component ID [{}] Version [{}]", componentId, version);
} catch (final KubernetesClientException e) {
if (isNotFound(e.getCode())) {
Expand All @@ -257,7 +221,8 @@ public StateMap getState(final String componentId) throws IOException {
final ConfigMap configMap = configMapResource(componentId).get();
final Map<String, String> data = configMap == null ? Collections.emptyMap() : getDecodedMap(configMap.getData());
final Optional<String> version = configMap == null ? Optional.empty() : getVersion(configMap);
return new StandardStateMap(data, version);
final Optional<ObjectMeta> configMapMetadata = configMap == null ? Optional.empty() : Optional.of(configMap.getMetadata());
return new StandardStateMap(data, version, configMapMetadata);
} catch (final RuntimeException e) {
throw new IOException(String.format("Get failed for Component ID [%s]", componentId), e);
}
Expand All @@ -273,12 +238,19 @@ public StateMap getState(final String componentId) throws IOException {
*/
@Override
public boolean replace(final StateMap currentState, final Map<String, String> state, final String componentId) throws IOException {
final ConfigMapBuilder configMapBuilder = createConfigMapBuilder(state, componentId);
final Optional<String> stateVersion = currentState.getStateVersion();
if (stateVersion.isPresent()) {
final String resourceVersion = stateVersion.get();
configMapBuilder.editOrNewMetadata().withResourceVersion(resourceVersion).endMetadata();
if (currentState instanceof StandardStateMap standardStateMap) {
return replace(standardStateMap, state, componentId);
} else {
throw new IllegalStateException("Current state is not an instance of StandardStateMap");
}
}

private boolean replace(final StandardStateMap currentState, final Map<String, String> state, final String componentId) throws IOException {
final Optional<ObjectMeta> existingMetadata = currentState.getConfigMapMetadata();
final ConfigMapBuilder configMapBuilder = createConfigMapBuilder(state, componentId, existingMetadata.orElse(null));

final Optional<String> stateVersion = currentState.getStateVersion();
stateVersion.ifPresent(resourceVersion -> configMapBuilder.editOrNewMetadata().withResourceVersion(resourceVersion).endMetadata());
final ConfigMap configMap = configMapBuilder.build();

try {
Expand Down Expand Up @@ -412,15 +384,25 @@ private Resource<ConfigMap> configMapResource(final String componentId) {
return kubernetesClient.configMaps().inNamespace(namespace).withName(name);
}

private ConfigMapBuilder createConfigMapBuilder(final Map<String, String> state, final String componentId) {
final Map<String, String> encodedData = getEncodedMap(state);
private ConfigMapBuilder createConfigMapBuilder(final Map<String, String> state, final String componentId, final ObjectMeta existingMetadata) {
final String name = getConfigMapName(componentId);
return new ConfigMapBuilder()

final ConfigMapBuilder configMapBuilder;
if (existingMetadata == null) {
configMapBuilder = new ConfigMapBuilder()
.withNewMetadata()
.withNamespace(namespace)
.withName(name)
.endMetadata()
.withData(encodedData);
.endMetadata();
} else if (namespace.equals(existingMetadata.getNamespace()) && name.equals(existingMetadata.getName())) {
configMapBuilder = new ConfigMapBuilder().withMetadata(existingMetadata);
} else {
throw new IllegalArgumentException("ConfigMap metadata with namespace [%s] and name [%s], did not match expected namespace [%s] and name [%s]"
.formatted(existingMetadata.getNamespace(), existingMetadata.getName(), namespace, name));
}

final Map<String, String> encodedData = getEncodedMap(state);
return configMapBuilder.withData(encodedData);
}

private String getConfigMapName(final String componentId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.nifi.kubernetes.state.provider;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import org.apache.nifi.components.state.StateMap;

import java.util.Collections;
Expand All @@ -30,9 +31,12 @@ class StandardStateMap implements StateMap {

private final Optional<String> version;

StandardStateMap(final Map<String, String> data, final Optional<String> version) {
private final Optional<ObjectMeta> configMapMetadata;

StandardStateMap(final Map<String, String> data, final Optional<String> version, final Optional<ObjectMeta> configMapMetadata) {
this.data = Collections.unmodifiableMap(data == null ? Collections.emptyMap() : data);
this.version = version;
this.configMapMetadata = configMapMetadata;
}


Expand Down Expand Up @@ -66,4 +70,8 @@ public String get(final String key) {
public Map<String, String> toMap() {
return data;
}

public Optional<ObjectMeta> getConfigMapMetadata() {
return configMapMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
Expand Down Expand Up @@ -61,7 +60,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -182,13 +180,10 @@ void testSetStateGetState() throws IOException {
void testSetStateConflict() {
final KubernetesClient mockClient = mock(KubernetesClient.class);
final MixedOperation<ConfigMap, ConfigMapList, Resource<ConfigMap>> mockConfigMaps = mock(MixedOperation.class);
final NonNamespaceOperation<ConfigMap, ConfigMapList, Resource<ConfigMap>> mockNamespacedConfigMaps = mock(NonNamespaceOperation.class);
final Resource<ConfigMap> mockResource = mock(Resource.class);

when(mockClient.configMaps()).thenReturn(mockConfigMaps);
when(mockConfigMaps.resource(any(ConfigMap.class))).thenReturn(mockResource);
when(mockConfigMaps.inNamespace(DEFAULT_NAMESPACE)).thenReturn(mockNamespacedConfigMaps);
when(mockNamespacedConfigMaps.withName(anyString())).thenReturn(mockResource);

final String conflictMessageTemplate = "Operation cannot be fulfilled on configmaps \"nifi-component-%s\": "
+ "the object has been modified; please apply your changes to the latest version and try again";
Expand Down Expand Up @@ -274,7 +269,7 @@ void testReplaceNotFound() throws IOException {
setContext();
provider.initialize(context);

final StateMap stateMap = new StandardStateMap(Collections.emptyMap(), Optional.empty());
final StateMap stateMap = new StandardStateMap(Collections.emptyMap(), Optional.empty(), Optional.empty());
final boolean replaced = provider.replace(stateMap, Collections.emptyMap(), COMPONENT_ID);

assertTrue(replaced);
Expand Down
Loading