From ba75652a420b5d4d61ff93d6030c50eff318c91b Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Mon, 8 Dec 2025 19:05:11 +0500 Subject: [PATCH 1/4] IGNITE-27275 Use MessageSerializer for ContinuousRoutineStartResultMessage --- .../communication/GridIoMessageFactory.java | 3 +- .../ContinuousRoutineStartResultMessage.java | 150 +++++------------- .../continuous/GridContinuousProcessor.java | 85 +++------- 3 files changed, 65 insertions(+), 173 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 83d8712dea7a4..919f816f62cc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer; import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer; import org.apache.ignite.internal.codegen.ClusterMetricsUpdateMessageSerializer; +import org.apache.ignite.internal.codegen.ContinuousRoutineStartResultMessageSerializer; import org.apache.ignite.internal.codegen.ErrorMessageSerializer; import org.apache.ignite.internal.codegen.ExchangeInfoSerializer; import org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer; @@ -460,7 +461,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register((short)132, UserAuthenticateResponseMessage::new, new UserAuthenticateResponseMessageSerializer()); factory.register(ClusterMetricsUpdateMessage.TYPE_CODE, ClusterMetricsUpdateMessage::new, new ClusterMetricsUpdateMessageSerializer()); - factory.register((short)134, ContinuousRoutineStartResultMessage::new); + factory.register((short)134, ContinuousRoutineStartResultMessage::new, new ContinuousRoutineStartResultMessageSerializer()); factory.register((short)135, LatchAckMessage::new, new LatchAckMessageSerializer()); factory.register(CacheMetricsMessage.TYPE_CODE, CacheMetricsMessage::new, new CacheMetricsMessageSerializer()); factory.register(NodeMetricsMessage.TYPE_CODE, NodeMetricsMessage::new, new NodeMetricsMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java index ad0a47c770b8e..aeebdae87349e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java @@ -17,13 +17,12 @@ package org.apache.ignite.internal.processors.continuous; -import java.nio.ByteBuffer; import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** @@ -31,19 +30,16 @@ */ public class ContinuousRoutineStartResultMessage implements Message { /** */ - private static final int ERROR_FLAG = 0x01; - - /** */ + @Order(0) private UUID routineId; /** */ - private byte[] errBytes; + @Order(value = 1, method = "errorMessage") + private @Nullable ErrorMessage errMsg; /** */ - private byte[] cntrsMapBytes; - - /** */ - private int flags; + @Order(value = 2, method = "countersMap") + private CachePartitionPartialCountersMap cntrsMap; /** * @@ -54,128 +50,68 @@ public ContinuousRoutineStartResultMessage() { /** * @param routineId Routine ID. - * @param cntrsMapBytes Marshalled {@link CachePartitionPartialCountersMap}. - * @param errBytes Error bytes. - * @param err {@code True} if failed to start routine. + * @param cntrsMap Counters map. + * @param err Error. */ - ContinuousRoutineStartResultMessage(UUID routineId, byte[] cntrsMapBytes, byte[] errBytes, boolean err) { + ContinuousRoutineStartResultMessage( + UUID routineId, + @Nullable CachePartitionPartialCountersMap cntrsMap, + @Nullable Throwable err + ) { this.routineId = routineId; - this.cntrsMapBytes = cntrsMapBytes; - this.errBytes = errBytes; + this.cntrsMap = cntrsMap; - if (err) - flags |= ERROR_FLAG; + if (err != null) + errMsg = new ErrorMessage(err); } /** - * @return Marshalled {@link CachePartitionPartialCountersMap}. + * @return Counters map. */ - @Nullable byte[] countersMapBytes() { - return cntrsMapBytes; + public @Nullable CachePartitionPartialCountersMap countersMap() { + return cntrsMap; } /** - * @return {@code True} if failed to start routine. + * @param cntrsMap Counters map. */ - boolean error() { - return (flags & ERROR_FLAG) != 0; + public void countersMap(@Nullable CachePartitionPartialCountersMap cntrsMap) { + this.cntrsMap = cntrsMap; } /** * @return Routine ID. */ - UUID routineId() { + public UUID routineId() { return routineId; } /** - * @return Error bytes. + * @param routineId Routine ID. */ - @Nullable byte[] errorBytes() { - return errBytes; + public void routineId(UUID routineId) { + this.routineId = routineId; } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray(cntrsMapBytes)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeByteArray(errBytes)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeInt(flags)) - return false; - - writer.incrementState(); - - case 3: - if (!writer.writeUuid(routineId)) - return false; - - writer.incrementState(); - - } - - return true; + /** + * @return Error message. + */ + public @Nullable ErrorMessage errorMessage() { + return errMsg; } - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - switch (reader.state()) { - case 0: - cntrsMapBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - errBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - flags = reader.readInt(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 3: - routineId = reader.readUuid(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } + /** + * @param errMsg Error message. + */ + public void errorMessage(@Nullable ErrorMessage errMsg) { + this.errMsg = errMsg; + } - return true; + /** + * @return Error. + */ + public @Nullable Throwable error() { + return ErrorMessage.error(errMsg); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index df98fc299d4c8..2ae5fa57f5c07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -1545,7 +1545,7 @@ private void processStartRequestV2(final AffinityTopologyVersion topVer, IgnitePredicate nodeFilter = null; - byte[] cntrs = null; + CachePartitionPartialCountersMap cntrsMap = null; if (reqData.nodeFilterBytes() != null) { try { @@ -1621,12 +1621,8 @@ private void processStartRequestV2(final AffinityTopologyVersion topVer, if (proc != null) { GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName()); - if (cache != null && cache.context().userCache()) { - CachePartitionPartialCountersMap cntrsMap = - cache.context().topology().localUpdateCounters(false); - - cntrs = U.marshal(marsh, cntrsMap); - } + if (cache != null && cache.context().userCache()) + cntrsMap = cache.context().topology().localUpdateCounters(false); } } } @@ -1639,7 +1635,7 @@ private void processStartRequestV2(final AffinityTopologyVersion topVer, } } - sendMessageStartResult(snd, msg.routineId(), cntrs, err); + sendMessageStartResult(snd, msg.routineId(), cntrsMap, err); } }); } @@ -1647,32 +1643,17 @@ private void processStartRequestV2(final AffinityTopologyVersion topVer, /** * @param node Target node. * @param routineId Routine ID. - * @param cntrsMapBytes Marshalled {@link CachePartitionPartialCountersMap}. + * @param cntrsMap Counters map. * @param err Start error if any. */ private void sendMessageStartResult(final ClusterNode node, final UUID routineId, - byte[] cntrsMapBytes, + CachePartitionPartialCountersMap cntrsMap, @Nullable final Exception err ) { - byte[] errBytes = null; - - if (err != null) { - try { - errBytes = U.marshal(marsh, err); - } - catch (Exception e) { - U.error(log, "Failed to marshal routine start error: " + e, e); - } - } - - ContinuousRoutineStartResultMessage msg = new ContinuousRoutineStartResultMessage(routineId, - cntrsMapBytes, - errBytes, - err != null); - try { - ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL); + ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, new ContinuousRoutineStartResultMessage(routineId, cntrsMap, err), + SYSTEM_POOL); } catch (ClusterTopologyCheckedException e) { if (log.isDebugEnabled()) @@ -2561,7 +2542,7 @@ private class StartFuture extends GridFutureAdapter { resCollect = new DiscoveryMessageResultsCollector(ctx) { @Override protected RoutineRegisterResults createResult(Map> rcvd) { - Map errs = null; + Map errs = null; Map>> cntrsPerNode = null; for (Map.Entry> entry : rcvd.entrySet()) { @@ -2570,48 +2551,22 @@ private class StartFuture extends GridFutureAdapter { if (msg == null) continue; - if (msg.error()) { - byte[] errBytes = msg.errorBytes(); - - Exception err = null; - - if (errBytes != null) { - try { - err = U.unmarshal(marsh, errBytes, U.resolveClassLoader(ctx.config())); - } - catch (Exception e) { - U.warn(log, "Failed to unmarhal continuous routine start error: " + e); - } - } - - if (err == null) { - err = new IgniteCheckedException("Failed to start continuous " + - "routine on node: " + entry.getKey()); - } + Throwable err = msg.error(); + if (err != null) { if (errs == null) errs = new HashMap<>(); errs.put(entry.getKey(), err); } else { - byte[] cntrsMapBytes = msg.countersMapBytes(); - - if (cntrsMapBytes != null) { - try { - CachePartitionPartialCountersMap cntrsMap = U.unmarshal( - marsh, - cntrsMapBytes, - U.resolveClassLoader(ctx.config())); + CachePartitionPartialCountersMap cntrsMap = msg.countersMap(); - if (cntrsPerNode == null) - cntrsPerNode = new HashMap<>(); + if (cntrsMap != null) { + if (cntrsPerNode == null) + cntrsPerNode = new HashMap<>(); - cntrsPerNode.put(entry.getKey(), CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); - } - catch (Exception e) { - U.warn(log, "Failed to unmarhal continuous query update counters: " + e); - } + cntrsPerNode.put(entry.getKey(), CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); } } } @@ -2637,7 +2592,7 @@ private class StartFuture extends GridFutureAdapter { */ private void onAllRemoteRegistered( AffinityTopologyVersion topVer, - @Nullable Map errs, + @Nullable Map errs, Map>> cntrsPerNode, Map> cntrs) { try { @@ -2661,7 +2616,7 @@ private void onAllRemoteRegistered( onRemoteRegistered(); } else { - Exception firstEx = F.first(errs.values()); + Throwable firstEx = F.first(errs.values()); onDone(firstEx); @@ -2729,7 +2684,7 @@ private static class RoutineRegisterResults { private final AffinityTopologyVersion topVer; /** */ - private final Map errs; + private final Map errs; /** */ private final Map>> cntrsPerNode; @@ -2740,7 +2695,7 @@ private static class RoutineRegisterResults { * @param cntrsPerNode Update counters. */ RoutineRegisterResults(AffinityTopologyVersion topVer, - Map errs, + Map errs, Map>> cntrsPerNode) { this.topVer = topVer; this.errs = errs; From 22ed7fd848c56d3a2cd42ee7155ce26aaa3adb0b Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Mon, 2 Feb 2026 14:19:18 +0500 Subject: [PATCH 2/4] send string --- .../ContinuousRoutineStartResultMessage.java | 44 ++++++++++++------- .../continuous/GridContinuousProcessor.java | 25 ++++++++--- 2 files changed, 46 insertions(+), 23 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java index aeebdae87349e..5226eaca739b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java @@ -19,7 +19,6 @@ import java.util.UUID; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; @@ -34,11 +33,15 @@ public class ContinuousRoutineStartResultMessage implements Message { private UUID routineId; /** */ - @Order(value = 1, method = "errorMessage") - private @Nullable ErrorMessage errMsg; + @Order(value = 1, method = "errorClass") + private @Nullable String errCls; /** */ - @Order(value = 2, method = "countersMap") + @Order(value = 2, method = "errorMessage") + private @Nullable String errMsg; + + /** */ + @Order(value = 3, method = "countersMap") private CachePartitionPartialCountersMap cntrsMap; /** @@ -56,13 +59,15 @@ public ContinuousRoutineStartResultMessage() { ContinuousRoutineStartResultMessage( UUID routineId, @Nullable CachePartitionPartialCountersMap cntrsMap, - @Nullable Throwable err + @Nullable Exception err ) { this.routineId = routineId; this.cntrsMap = cntrsMap; - if (err != null) - errMsg = new ErrorMessage(err); + if (err != null) { + errCls = err.getClass().getName(); + errMsg = err.getMessage(); + } } /** @@ -94,24 +99,31 @@ public void routineId(UUID routineId) { } /** - * @return Error message. + * @return Error class. */ - public @Nullable ErrorMessage errorMessage() { - return errMsg; + public @Nullable String errorClass() { + return errCls; } /** - * @param errMsg Error message. + * @param errCls Error class. */ - public void errorMessage(@Nullable ErrorMessage errMsg) { - this.errMsg = errMsg; + public void errorClass(@Nullable String errCls) { + this.errCls = errCls; + } + + /** + * @return Error message. + */ + public @Nullable String errorMessage() { + return errMsg; } /** - * @return Error. + * @param errMsg Error message. */ - public @Nullable Throwable error() { - return ErrorMessage.error(errMsg); + public void errorMessage(@Nullable String errMsg) { + this.errMsg = errMsg; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 2ae5fa57f5c07..38ba247c18e33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -2542,7 +2542,7 @@ private class StartFuture extends GridFutureAdapter { resCollect = new DiscoveryMessageResultsCollector(ctx) { @Override protected RoutineRegisterResults createResult(Map> rcvd) { - Map errs = null; + Map errs = null; Map>> cntrsPerNode = null; for (Map.Entry> entry : rcvd.entrySet()) { @@ -2551,12 +2551,23 @@ private class StartFuture extends GridFutureAdapter { if (msg == null) continue; - Throwable err = msg.error(); + String errCls = msg.errorClass(); - if (err != null) { + if (errCls != null) { if (errs == null) errs = new HashMap<>(); + Exception err; + + try { + err = (Exception)Class.forName(errCls).getConstructor(String.class).newInstance(msg.errorMessage()); + } + catch (Exception e) { + U.error(log, "Failed to instantiate exception class: " + errCls, e); + + err = new IgniteCheckedException(msg.errorMessage()); + } + errs.put(entry.getKey(), err); } else { @@ -2592,7 +2603,7 @@ private class StartFuture extends GridFutureAdapter { */ private void onAllRemoteRegistered( AffinityTopologyVersion topVer, - @Nullable Map errs, + @Nullable Map errs, Map>> cntrsPerNode, Map> cntrs) { try { @@ -2616,7 +2627,7 @@ private void onAllRemoteRegistered( onRemoteRegistered(); } else { - Throwable firstEx = F.first(errs.values()); + Exception firstEx = F.first(errs.values()); onDone(firstEx); @@ -2684,7 +2695,7 @@ private static class RoutineRegisterResults { private final AffinityTopologyVersion topVer; /** */ - private final Map errs; + private final Map errs; /** */ private final Map>> cntrsPerNode; @@ -2695,7 +2706,7 @@ private static class RoutineRegisterResults { * @param cntrsPerNode Update counters. */ RoutineRegisterResults(AffinityTopologyVersion topVer, - Map errs, + Map errs, Map>> cntrsPerNode) { this.topVer = topVer; this.errs = errs; From fea6362c4fda091236df721449d11672957637dd Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 4 Feb 2026 09:56:34 +0500 Subject: [PATCH 3/4] Revert "send string" This reverts commit 22ed7fd848c56d3a2cd42ee7155ce26aaa3adb0b. --- .../ContinuousRoutineStartResultMessage.java | 44 +++++++------------ .../continuous/GridContinuousProcessor.java | 25 +++-------- 2 files changed, 23 insertions(+), 46 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java index 5226eaca739b1..aeebdae87349e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java @@ -19,6 +19,7 @@ import java.util.UUID; import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; @@ -33,15 +34,11 @@ public class ContinuousRoutineStartResultMessage implements Message { private UUID routineId; /** */ - @Order(value = 1, method = "errorClass") - private @Nullable String errCls; + @Order(value = 1, method = "errorMessage") + private @Nullable ErrorMessage errMsg; /** */ - @Order(value = 2, method = "errorMessage") - private @Nullable String errMsg; - - /** */ - @Order(value = 3, method = "countersMap") + @Order(value = 2, method = "countersMap") private CachePartitionPartialCountersMap cntrsMap; /** @@ -59,15 +56,13 @@ public ContinuousRoutineStartResultMessage() { ContinuousRoutineStartResultMessage( UUID routineId, @Nullable CachePartitionPartialCountersMap cntrsMap, - @Nullable Exception err + @Nullable Throwable err ) { this.routineId = routineId; this.cntrsMap = cntrsMap; - if (err != null) { - errCls = err.getClass().getName(); - errMsg = err.getMessage(); - } + if (err != null) + errMsg = new ErrorMessage(err); } /** @@ -98,34 +93,27 @@ public void routineId(UUID routineId) { this.routineId = routineId; } - /** - * @return Error class. - */ - public @Nullable String errorClass() { - return errCls; - } - - /** - * @param errCls Error class. - */ - public void errorClass(@Nullable String errCls) { - this.errCls = errCls; - } - /** * @return Error message. */ - public @Nullable String errorMessage() { + public @Nullable ErrorMessage errorMessage() { return errMsg; } /** * @param errMsg Error message. */ - public void errorMessage(@Nullable String errMsg) { + public void errorMessage(@Nullable ErrorMessage errMsg) { this.errMsg = errMsg; } + /** + * @return Error. + */ + public @Nullable Throwable error() { + return ErrorMessage.error(errMsg); + } + /** {@inheritDoc} */ @Override public short directType() { return 134; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 38ba247c18e33..2ae5fa57f5c07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -2542,7 +2542,7 @@ private class StartFuture extends GridFutureAdapter { resCollect = new DiscoveryMessageResultsCollector(ctx) { @Override protected RoutineRegisterResults createResult(Map> rcvd) { - Map errs = null; + Map errs = null; Map>> cntrsPerNode = null; for (Map.Entry> entry : rcvd.entrySet()) { @@ -2551,23 +2551,12 @@ private class StartFuture extends GridFutureAdapter { if (msg == null) continue; - String errCls = msg.errorClass(); + Throwable err = msg.error(); - if (errCls != null) { + if (err != null) { if (errs == null) errs = new HashMap<>(); - Exception err; - - try { - err = (Exception)Class.forName(errCls).getConstructor(String.class).newInstance(msg.errorMessage()); - } - catch (Exception e) { - U.error(log, "Failed to instantiate exception class: " + errCls, e); - - err = new IgniteCheckedException(msg.errorMessage()); - } - errs.put(entry.getKey(), err); } else { @@ -2603,7 +2592,7 @@ private class StartFuture extends GridFutureAdapter { */ private void onAllRemoteRegistered( AffinityTopologyVersion topVer, - @Nullable Map errs, + @Nullable Map errs, Map>> cntrsPerNode, Map> cntrs) { try { @@ -2627,7 +2616,7 @@ private void onAllRemoteRegistered( onRemoteRegistered(); } else { - Exception firstEx = F.first(errs.values()); + Throwable firstEx = F.first(errs.values()); onDone(firstEx); @@ -2695,7 +2684,7 @@ private static class RoutineRegisterResults { private final AffinityTopologyVersion topVer; /** */ - private final Map errs; + private final Map errs; /** */ private final Map>> cntrsPerNode; @@ -2706,7 +2695,7 @@ private static class RoutineRegisterResults { * @param cntrsPerNode Update counters. */ RoutineRegisterResults(AffinityTopologyVersion topVer, - Map errs, + Map errs, Map>> cntrsPerNode) { this.topVer = topVer; this.errs = errs; From 2831959c6f31e3d19cbfa59d8051575bf3dc6672 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 4 Feb 2026 10:07:34 +0500 Subject: [PATCH 4/4] minor refactoring --- .../continuous/ContinuousRoutineStartResultMessage.java | 7 ------- .../processors/continuous/GridContinuousProcessor.java | 9 +++++++-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java index aeebdae87349e..bf12b906a3032 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java @@ -107,13 +107,6 @@ public void errorMessage(@Nullable ErrorMessage errMsg) { this.errMsg = errMsg; } - /** - * @return Error. - */ - public @Nullable Throwable error() { - return ErrorMessage.error(errMsg); - } - /** {@inheritDoc} */ @Override public short directType() { return 134; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 2ae5fa57f5c07..874f85d20ae62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; @@ -2551,9 +2552,13 @@ private class StartFuture extends GridFutureAdapter { if (msg == null) continue; - Throwable err = msg.error(); + ErrorMessage errMsg = msg.errorMessage(); + + if (errMsg != null) { + Throwable err = errMsg.error() == null + ? new IgniteCheckedException("Failed to start continuous routine on node: " + entry.getKey()) + : errMsg.error(); - if (err != null) { if (errs == null) errs = new HashMap<>();