From a0892a4c3ad229a03076ef41fbdcf435165c6b59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 4 Feb 2026 21:18:03 +0100 Subject: [PATCH 1/6] improve: adding test for repeated caching and filtering update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/InformerEventSourceTest.java | 57 ++++++++++++++++++- 1 file changed, 54 insertions(+), 3 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index e2c3de8975..d22200e4e3 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -267,14 +267,65 @@ void filterAddEventBeforeUpdate() { assertNoEventProduced(); } + @Test + void multipleCachingFilteringUpdates() { + withRealTemporaryResourceCache(); + CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch2 = + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3); + + informerEventSource.onUpdate( + deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); + latch.countDown(); + latch2.countDown(); + informerEventSource.onUpdate( + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + + assertNoEventProduced(); + } + + @Test + void multipleCachingFilteringUpdates_variation2() { + withRealTemporaryResourceCache(); + + CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch2 = + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3); + + informerEventSource.onUpdate( + deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); + latch.countDown(); + informerEventSource.onUpdate( + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + latch2.countDown(); + + assertNoEventProduced(); + } + + @Test + void multipleCachingFilteringUpdates_variation3() { + withRealTemporaryResourceCache(); + + CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch2 = + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3); + + latch.countDown(); + informerEventSource.onUpdate( + deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); + informerEventSource.onUpdate( + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + latch2.countDown(); + + assertNoEventProduced(); + } + private void assertNoEventProduced() { await() .pollDelay(Duration.ofMillis(50)) .timeout(Duration.ofMillis(51)) .untilAsserted( - () -> { - verify(informerEventSource, never()).handleEvent(any(), any(), any(), any()); - }); + () -> verify(informerEventSource, never()).handleEvent(any(), any(), any(), any())); } private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) { From 80bdb4a29278b7432d36156e75cc26bd3e40fea4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 4 Feb 2026 21:45:52 +0100 Subject: [PATCH 2/6] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/EventFilterDetails.java | 4 ++++ .../source/informer/ManagedInformerEventSource.java | 4 +--- .../event/source/informer/TemporaryResourceCache.java | 10 +++++++++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java index 8b573a986c..dc65370ab0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java @@ -49,4 +49,8 @@ public Optional getLatestEventAfterLastUpdateEvent(String updated } return Optional.empty(); } + + public int getActiveUpdates() { + return activeUpdates; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index dcfe687a2f..301ece4424 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -93,9 +93,7 @@ public void changeNamespaces(Set namespaces) { @SuppressWarnings("unchecked") public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator updateMethod) { ResourceID id = ResourceID.fromResource(resourceToUpdate); - if (log.isDebugEnabled()) { - log.debug("Update and cache: {}", id); - } + log.debug("Update and cache: {}", id); R updatedResource = null; try { temporaryResourceCache.startEventFilteringModify(id); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 6e1d30c323..8c05f3d2f9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -87,8 +87,16 @@ public synchronized Optional doneEventFilterModify( var ed = activeUpdates.get(resourceID); if (ed.decreaseActiveUpdates()) { activeUpdates.remove(resourceID); - return ed.getLatestEventAfterLastUpdateEvent(updatedResourceVersion); + var res = ed.getLatestEventAfterLastUpdateEvent(updatedResourceVersion); + log.debug( + "Zero active updates for resource id: {}; event after update event: {}; updated resource" + + " version: {}", + resourceID, + res.isPresent(), + updatedResourceVersion); + return res; } else { + log.debug("Active updates {} for resource id: {}", ed.getActiveUpdates(), resourceID); return Optional.empty(); } } From bded229cf6d1621a5cdefc50eafdc012e7e09158 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 4 Feb 2026 22:01:47 +0100 Subject: [PATCH 3/6] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../source/informer/EventFilterDetails.java | 26 ++++++++++++++++--- .../informer/TemporaryResourceCache.java | 3 ++- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java index dc65370ab0..28aa3f0dc5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java @@ -16,7 +16,9 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.util.Optional; +import java.util.function.UnaryOperator; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.ReconcilerUtilsInternal; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -24,6 +26,7 @@ class EventFilterDetails { private int activeUpdates = 0; private ResourceEvent lastEvent; + private String lastOwnUpdatedResourceVersion; public void increaseActiveUpdates() { activeUpdates = activeUpdates + 1; @@ -38,12 +41,29 @@ public void setLastEvent(ResourceEvent event) { lastEvent = event; } - public Optional getLatestEventAfterLastUpdateEvent(String updatedResourceVersion) { + /** + * This is needed for case when multiple parallel updates happening inside the controller to + * prevent race condition and send event from {@link + * ManagedInformerEventSource#eventFilteringUpdateAndCacheResource(HasMetadata, UnaryOperator)} + */ + public void handleLastOwnUpdatedResourceVersion(String resourceVersion) { + if (resourceVersion == null) { + return; + } + if (lastOwnUpdatedResourceVersion == null + || ReconcilerUtilsInternal.compareResourceVersions( + resourceVersion, lastOwnUpdatedResourceVersion) + > 0) { + lastOwnUpdatedResourceVersion = resourceVersion; + } + } + + public Optional getLatestEventAfterLastUpdateEvent() { if (lastEvent != null - && (updatedResourceVersion == null + && (lastOwnUpdatedResourceVersion == null || ReconcilerUtilsInternal.compareResourceVersions( lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion(), - updatedResourceVersion) + lastOwnUpdatedResourceVersion) > 0)) { return Optional.of(lastEvent); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 8c05f3d2f9..99ed064667 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -85,9 +85,10 @@ public synchronized Optional doneEventFilterModify( return Optional.empty(); } var ed = activeUpdates.get(resourceID); + ed.handleLastOwnUpdatedResourceVersion(updatedResourceVersion); if (ed.decreaseActiveUpdates()) { activeUpdates.remove(resourceID); - var res = ed.getLatestEventAfterLastUpdateEvent(updatedResourceVersion); + var res = ed.getLatestEventAfterLastUpdateEvent(); log.debug( "Zero active updates for resource id: {}; event after update event: {}; updated resource" + " version: {}", From 935a8b6a2440e44f68c9bd0f1fa5a9e3f4a6f882 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 4 Feb 2026 22:11:19 +0100 Subject: [PATCH 4/6] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/InformerEventSourceTest.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index d22200e4e3..c3a6f8e91e 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -285,7 +285,7 @@ void multipleCachingFilteringUpdates() { } @Test - void multipleCachingFilteringUpdates_variation2() { + void multipleCachingFilteringUpdates_variant2() { withRealTemporaryResourceCache(); CountDownLatch latch = sendForEventFilteringUpdate(2); @@ -303,7 +303,7 @@ void multipleCachingFilteringUpdates_variation2() { } @Test - void multipleCachingFilteringUpdates_variation3() { + void multipleCachingFilteringUpdates_variant3() { withRealTemporaryResourceCache(); CountDownLatch latch = sendForEventFilteringUpdate(2); @@ -320,6 +320,24 @@ void multipleCachingFilteringUpdates_variation3() { assertNoEventProduced(); } + @Test + void multipleCachingFilteringUpdates_variant4() { + withRealTemporaryResourceCache(); + + CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch2 = + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3); + + informerEventSource.onUpdate( + deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); + informerEventSource.onUpdate( + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + latch.countDown(); + latch2.countDown(); + + assertNoEventProduced(); + } + private void assertNoEventProduced() { await() .pollDelay(Duration.ofMillis(50)) From f21faadfb15e7201d72ae0a2248ed6bda68a21f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 5 Feb 2026 09:09:48 +0100 Subject: [PATCH 5/6] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../source/informer/EventFilterDetails.java | 32 ++++++++----------- .../informer/TemporaryResourceCache.java | 3 +- 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java index 28aa3f0dc5..b747c69dff 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java @@ -32,7 +32,20 @@ public void increaseActiveUpdates() { activeUpdates = activeUpdates + 1; } - public boolean decreaseActiveUpdates() { + /** + * resourceVersion is needed for case when multiple parallel updates happening inside the + * controller to prevent race condition and send event from {@link + * ManagedInformerEventSource#eventFilteringUpdateAndCacheResource(HasMetadata, UnaryOperator)} + */ + public boolean decreaseActiveUpdates(String updatedResourceVersion) { + if (updatedResourceVersion != null + && (lastOwnUpdatedResourceVersion == null + || ReconcilerUtilsInternal.compareResourceVersions( + updatedResourceVersion, lastOwnUpdatedResourceVersion) + > 0)) { + lastOwnUpdatedResourceVersion = updatedResourceVersion; + } + activeUpdates = activeUpdates - 1; return activeUpdates == 0; } @@ -41,23 +54,6 @@ public void setLastEvent(ResourceEvent event) { lastEvent = event; } - /** - * This is needed for case when multiple parallel updates happening inside the controller to - * prevent race condition and send event from {@link - * ManagedInformerEventSource#eventFilteringUpdateAndCacheResource(HasMetadata, UnaryOperator)} - */ - public void handleLastOwnUpdatedResourceVersion(String resourceVersion) { - if (resourceVersion == null) { - return; - } - if (lastOwnUpdatedResourceVersion == null - || ReconcilerUtilsInternal.compareResourceVersions( - resourceVersion, lastOwnUpdatedResourceVersion) - > 0) { - lastOwnUpdatedResourceVersion = resourceVersion; - } - } - public Optional getLatestEventAfterLastUpdateEvent() { if (lastEvent != null && (lastOwnUpdatedResourceVersion == null diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 99ed064667..1b70d4a0db 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -85,8 +85,7 @@ public synchronized Optional doneEventFilterModify( return Optional.empty(); } var ed = activeUpdates.get(resourceID); - ed.handleLastOwnUpdatedResourceVersion(updatedResourceVersion); - if (ed.decreaseActiveUpdates()) { + if (ed.decreaseActiveUpdates(updatedResourceVersion)) { activeUpdates.remove(resourceID); var res = ed.getLatestEventAfterLastUpdateEvent(); log.debug( From 818bc0dd8f9893472cb9e594ba58233379e19d4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 5 Feb 2026 09:20:10 +0100 Subject: [PATCH 6/6] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/TemporaryResourceCache.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 1b70d4a0db..1dbbf36043 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -85,20 +85,22 @@ public synchronized Optional doneEventFilterModify( return Optional.empty(); } var ed = activeUpdates.get(resourceID); - if (ed.decreaseActiveUpdates(updatedResourceVersion)) { - activeUpdates.remove(resourceID); - var res = ed.getLatestEventAfterLastUpdateEvent(); + if (ed == null || !ed.decreaseActiveUpdates(updatedResourceVersion)) { log.debug( - "Zero active updates for resource id: {}; event after update event: {}; updated resource" - + " version: {}", - resourceID, - res.isPresent(), - updatedResourceVersion); - return res; - } else { - log.debug("Active updates {} for resource id: {}", ed.getActiveUpdates(), resourceID); + "Active updates {} for resource id: {}", + ed != null ? ed.getActiveUpdates() : 0, + resourceID); return Optional.empty(); } + activeUpdates.remove(resourceID); + var res = ed.getLatestEventAfterLastUpdateEvent(); + log.debug( + "Zero active updates for resource id: {}; event after update event: {}; updated resource" + + " version: {}", + resourceID, + res.isPresent(), + updatedResourceVersion); + return res; } public void onDeleteEvent(T resource, boolean unknownState) {