-
Notifications
You must be signed in to change notification settings - Fork 235
feat: provide de-duplicated secondary resources stream on Context #3141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: next
Are you sure you want to change the base?
Changes from all commits
fec20a2
f26ad07
97df301
a67b673
d145562
5a5027a
fd78e57
ade5d7f
fdfaa63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| */ | ||
| package io.javaoperatorsdk.operator.api.reconciler; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
|
|
@@ -24,8 +25,12 @@ | |
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import io.fabric8.kubernetes.api.model.HasMetadata; | ||
| import io.fabric8.kubernetes.client.KubernetesClient; | ||
| import io.javaoperatorsdk.operator.ReconcilerUtilsInternal; | ||
| import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; | ||
| import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; | ||
| import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext; | ||
|
|
@@ -36,6 +41,7 @@ | |
| import io.javaoperatorsdk.operator.processing.event.ResourceID; | ||
|
|
||
| public class DefaultContext<P extends HasMetadata> implements Context<P> { | ||
| private static final Logger log = LoggerFactory.getLogger(DefaultContext.class); | ||
|
|
||
| private RetryInfo retryInfo; | ||
| private final Controller<P> controller; | ||
|
|
@@ -73,11 +79,56 @@ public <T> Set<T> getSecondaryResources(Class<T> expectedType) { | |
| return getSecondaryResourcesAsStream(expectedType).collect(Collectors.toSet()); | ||
| } | ||
|
|
||
| @Override | ||
| public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) { | ||
| return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream() | ||
| .map(es -> es.getSecondaryResources(primaryResource)) | ||
| .flatMap(Set::stream); | ||
| public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolean deduplicate) { | ||
metacosm marked this conversation as resolved.
Show resolved
Hide resolved
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add comprehensive javadoc |
||
| if (deduplicate && !HasMetadata.class.isAssignableFrom(expectedType)) { | ||
| throw new IllegalArgumentException("Can only de-duplicate HasMetadata descendants"); | ||
| } | ||
|
|
||
| final var idToLatest = deduplicate ? new HashMap<ResourceID, String>() : null; | ||
| final var stream = | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned above for sake of simplicity/readbaility can if deduplicate is
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could but this implementation avoids creating intermediary Streams, though I agree that it's slightly less readable. Let me see what I can do about that. |
||
| controller.getEventSourceManager().getEventSourcesFor(expectedType).stream() | ||
| .<R>mapMulti( | ||
| (es, consumer) -> | ||
| es.getSecondaryResources(primaryResource) | ||
| .forEach( | ||
| r -> { | ||
| var reject = false; | ||
| if (deduplicate) { | ||
| final boolean[] rejectAr = new boolean[1]; | ||
metacosm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| final var hm = (HasMetadata) r; | ||
| final var resourceVersion = hm.getMetadata().getResourceVersion(); | ||
| idToLatest.merge( | ||
| ResourceID.fromResource(hm), | ||
| resourceVersion, | ||
| (existing, replacement) -> { | ||
| final var comparison = | ||
| ReconcilerUtilsInternal.compareResourceVersions( | ||
| existing, replacement); | ||
| rejectAr[0] = | ||
| comparison == 0; // rejecting resource if version is equal | ||
| return comparison >= 0 ? existing : replacement; | ||
| }); | ||
| reject = rejectAr[0]; | ||
| } | ||
| // only keep resources that don't have the same id and resource | ||
| // version | ||
| if (!reject) { | ||
| consumer.accept(r); | ||
| } | ||
| })); | ||
| if (deduplicate) { | ||
| //noinspection unchecked | ||
| return stream | ||
| .map(HasMetadata.class::cast) | ||
| .filter( | ||
| hm -> { | ||
| final var resourceVersion = hm.getMetadata().getResourceVersion(); | ||
| return resourceVersion.equals(idToLatest.get(ResourceID.fromResource(hm))); | ||
| }) | ||
| .map(hasMetadata -> (R) hasMetadata); | ||
| } else { | ||
| return stream; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,8 +16,17 @@ | |
| package io.javaoperatorsdk.operator.api.reconciler; | ||
|
|
||
| import java.lang.reflect.InvocationTargetException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.function.Function; | ||
| import java.util.function.Predicate; | ||
| import java.util.function.UnaryOperator; | ||
| import java.util.stream.Collector; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
@@ -32,6 +41,7 @@ | |
| import io.javaoperatorsdk.operator.processing.event.ResourceID; | ||
| import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; | ||
|
|
||
| import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.compareResourceVersions; | ||
| import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; | ||
| import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; | ||
|
|
||
|
|
@@ -364,13 +374,13 @@ public static <R extends HasMetadata> R resourcePatch( | |
| if (esList.isEmpty()) { | ||
| throw new IllegalStateException("No event source found for type: " + resource.getClass()); | ||
| } | ||
| var es = esList.get(0); | ||
| if (esList.size() > 1) { | ||
| throw new IllegalStateException( | ||
| "Multiple event sources found for: " | ||
| + resource.getClass() | ||
| + " please provide the target event source"); | ||
| log.warn( | ||
| "Multiple event sources found for type: {}, selecting first with name {}", | ||
| resource.getClass(), | ||
| es.name()); | ||
| } | ||
| var es = esList.get(0); | ||
| if (es instanceof ManagedInformerEventSource mes) { | ||
| return resourcePatch(resource, updateOperation, mes); | ||
| } else { | ||
|
|
@@ -595,4 +605,55 @@ public static <P extends HasMetadata> P addFinalizerWithSSA( | |
| e); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns a collector that deduplicates Kubernetes objects by keeping only the one with the | ||
| * latest metadata.resourceVersion for each unique name and namespace combination. The intended | ||
| * use case is for the rather rare setup when there are overlapping {@link | ||
| * io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a | ||
| * resource type. | ||
| * | ||
| * @param <T> the type of HasMetadata objects | ||
| * @return a collector that produces a collection of deduplicated Kubernetes objects | ||
| */ | ||
| public static <T extends HasMetadata> Collector<T, ?, Collection<T>> latestDistinct() { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wasn't this moved to ReconcileUtilsInternal
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we were to keep this implementation, this actually would be removed |
||
| return Collectors.collectingAndThen(latestDistinctToMap(), Map::values); | ||
| } | ||
|
|
||
| /** | ||
| * Returns a collector that deduplicates Kubernetes objects by keeping only the one with the | ||
| * latest metadata.resourceVersion for each unique name and namespace combination. The intended | ||
| * use case is for the rather rare setup when there are overlapping {@link | ||
| * io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a | ||
| * resource type. | ||
| * | ||
| * @param <T> the type of HasMetadata objects | ||
| * @return a collector that produces a List of deduplicated Kubernetes objects | ||
| */ | ||
| public static <T extends HasMetadata> Collector<T, ?, List<T>> latestDistinctList() { | ||
| return Collectors.collectingAndThen( | ||
| latestDistinctToMap(), map -> new ArrayList<>(map.values())); | ||
| } | ||
|
|
||
| /** | ||
| * Returns a collector that deduplicates Kubernetes objects by keeping only the one with the | ||
| * latest metadata.resourceVersion for each unique name and namespace combination. The intended | ||
| * use case is for the rather rare setup when there are overlapping {@link | ||
| * io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}s for a | ||
| * resource type. | ||
| * | ||
| * @param <T> the type of HasMetadata objects | ||
| * @return a collector that produces a Set of deduplicated Kubernetes objects | ||
| */ | ||
| public static <T extends HasMetadata> Collector<T, ?, Set<T>> latestDistinctSet() { | ||
| return Collectors.collectingAndThen(latestDistinctToMap(), map -> new HashSet<>(map.values())); | ||
| } | ||
|
|
||
| private static <T extends HasMetadata> Collector<T, ?, Map<ResourceID, T>> latestDistinctToMap() { | ||
| return Collectors.toMap( | ||
| ResourceID::fromResource, | ||
| Function.identity(), | ||
| (existing, replacement) -> | ||
| compareResourceVersions(existing, replacement) >= 0 ? existing : replacement); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we could use the former implementation and call this in case in the new method the deduplication is false.