From 829fbe17b8309863f9aff22e4bd85512e423e1d3 Mon Sep 17 00:00:00 2001 From: devxb Date: Sun, 6 Apr 2025 16:41:45 +0900 Subject: [PATCH 1/3] feat: Rollback Consumer Dead Letter --- .../org/rooftop/netx/api/DeadLetterRelay.kt | 37 +++++ .../kotlin/org/rooftop/netx/api/Exceptions.kt | 4 + .../kotlin/org/rooftop/netx/api/SagaEvent.kt | 23 +++ .../netx/engine/AbstractSagaDispatcher.kt | 67 ++------ .../netx/engine/MonoDispatchFunction.kt | 14 +- .../netx/engine/NotPublishDispatchFunction.kt | 15 +- .../engine/OrchestrateDispatchFunction.kt | 13 +- .../org/rooftop/netx/engine/core/Saga.kt | 68 +++++++- .../deadletter/AbstractDeadLetterManager.kt | 35 ++++ .../listen/AbstractOrchestrateListener.kt | 2 +- .../netx/engine/logging/LoggingSupports.kt | 6 +- .../netx/redis/RedisDeadLetterManager.kt | 99 ++++++++++++ .../rooftop/netx/redis/RedisSagaConfigurer.kt | 11 ++ .../netx/redis/RedisStreamSagaDispatcher.kt | 4 +- .../rooftop/netx/spi/DeadLetterListener.kt | 19 +++ .../rooftop/netx/spi/DeadLetterRegistry.kt | 16 ++ .../netx/engine/DeadLetterAnnotationClass.kt | 33 ++++ .../netx/engine/DeadLetterConfigurer.kt | 109 +++++++++++++ .../netx/engine/OrchestratorConfigurer.kt | 59 +++++++ .../rooftop/netx/engine/OrchestratorTest.kt | 62 ++++++- .../netx/redis/RedisDeadLetterManagerTest.kt | 153 ++++++++++++++++++ 21 files changed, 785 insertions(+), 64 deletions(-) create mode 100644 src/main/kotlin/org/rooftop/netx/api/DeadLetterRelay.kt create mode 100644 src/main/kotlin/org/rooftop/netx/engine/deadletter/AbstractDeadLetterManager.kt create mode 100644 src/main/kotlin/org/rooftop/netx/redis/RedisDeadLetterManager.kt create mode 100644 src/main/kotlin/org/rooftop/netx/spi/DeadLetterListener.kt create mode 100644 src/main/kotlin/org/rooftop/netx/spi/DeadLetterRegistry.kt create mode 100644 src/test/kotlin/org/rooftop/netx/engine/DeadLetterAnnotationClass.kt create mode 100644 src/test/kotlin/org/rooftop/netx/engine/DeadLetterConfigurer.kt create mode 100644 src/test/kotlin/org/rooftop/netx/redis/RedisDeadLetterManagerTest.kt diff --git a/src/main/kotlin/org/rooftop/netx/api/DeadLetterRelay.kt b/src/main/kotlin/org/rooftop/netx/api/DeadLetterRelay.kt new file mode 100644 index 0000000..3486699 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/api/DeadLetterRelay.kt @@ -0,0 +1,37 @@ +package org.rooftop.netx.api + +import reactor.core.publisher.Mono + +/** + * An interface for relay dead letters, with support for recovering those that failed during rollback. + */ +interface DeadLetterRelay { + + /** + * relay dead letter and return SagaEvent + * + * @return SagaEvent | A dead letter that was successfully processed + */ + fun relay(): Mono + + /** + * @see relay + * @return SagaEvent | A dead letter that was successfully processed + */ + fun relaySync(): SagaEvent + + + /** + * relay dead letter and return SagaEvent by specific deadLetterId (not a same SagaEvent.id) + * + * @param deadLetterId + * @return SagaEvent | A dead letter that was successfully processed + */ + fun relay(deadLetterId: String): Mono + + /** + * @see relay + * @return SagaEvent | A dead letter that was successfully processed (not a same SagaEvent.id) + */ + fun relaySync(deadLetterId: String): SagaEvent +} diff --git a/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt b/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt index c29cf7a..2eb9b7b 100644 --- a/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt +++ b/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt @@ -18,5 +18,9 @@ class FailedAckSagaException(message: String) : RuntimeException(message) class ResultTimeoutException(message: String, throwable: Throwable) : RuntimeException(message, throwable) +class DeadLetterTimeoutException(message: String): RuntimeException(message) + +class DeadLetterException(message: String): RuntimeException(message) + @JsonIgnoreProperties(ignoreUnknown = true) class ResultException(message: String) : RuntimeException(message) diff --git a/src/main/kotlin/org/rooftop/netx/api/SagaEvent.kt b/src/main/kotlin/org/rooftop/netx/api/SagaEvent.kt index e07a2d7..0399b87 100644 --- a/src/main/kotlin/org/rooftop/netx/api/SagaEvent.kt +++ b/src/main/kotlin/org/rooftop/netx/api/SagaEvent.kt @@ -84,5 +84,28 @@ sealed class SagaEvent( type ) + /** + * If you use Orchestrator and get SagaEvent that hold orchestrateEvent. + * you can get type class by using this function. + * + * @param type + * @param T + */ + fun decodeOrchestrateEvent(type: Class): T = decodeOrchestrateEvent(type.kotlin) + + /** + * @see decodeOrchestrateEvent + */ + fun decodeOrchestrateEvent(type: KClass): T { + val orchestrateEvent = codec.decode( + event ?: throw NullPointerException("Cannot decode event cause event is null"), + object : TypeReference>() {} + ) + val clientEvent = orchestrateEvent["clientEvent"] + ?: throw NullPointerException("Cannot decode event cause orchestrateEvent.clientEvent is null") + + return codec.decode(clientEvent as String, type) + } + internal abstract fun copy(): SagaEvent } diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaDispatcher.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaDispatcher.kt index b383ccb..1245f9f 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaDispatcher.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaDispatcher.kt @@ -5,6 +5,7 @@ import org.rooftop.netx.api.* import org.rooftop.netx.core.Codec import org.rooftop.netx.engine.core.Saga import org.rooftop.netx.engine.core.SagaState +import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager import org.rooftop.netx.engine.logging.info import org.rooftop.netx.engine.logging.warningOnError import reactor.core.publisher.Flux @@ -17,8 +18,13 @@ import kotlin.reflect.full.declaredMemberFunctions internal abstract class AbstractSagaDispatcher( private val codec: Codec, private val sagaManager: SagaManager, + private val abstractDeadLetterManager: AbstractDeadLetterManager, ) { + init { + this.also { abstractDeadLetterManager.dispatcher = it } + } + private val functions = mutableMapOf>>() @@ -55,7 +61,8 @@ internal abstract class AbstractSagaDispatcher( messageId: String ): Flux<*> = this.doOnComplete { Mono.just(saga to messageId) - .info("Ack saga \"${saga.id}\"") + .filter { messageId != DEAD_LETTER } + .info("Ack saga \"${saga.id}\" messageId: $messageId") .flatMap { ack(saga, messageId) .warningOnError("Fail to ack saga \"${saga.id}\"") @@ -65,57 +72,7 @@ internal abstract class AbstractSagaDispatcher( } private fun mapSagaEvent(saga: Saga): Mono { - return when (saga.state) { - SagaState.START -> Mono.just( - SagaStartEvent( - id = saga.id, - nodeName = saga.serverId, - group = saga.group, - event = extractEvent(saga), - codec = codec, - ) - ) - - SagaState.COMMIT -> Mono.just( - SagaCommitEvent( - id = saga.id, - nodeName = saga.serverId, - group = saga.group, - event = extractEvent(saga), - codec = codec - ) - ) - - SagaState.JOIN -> Mono.just( - SagaJoinEvent( - id = saga.id, - nodeName = saga.serverId, - group = saga.group, - event = extractEvent(saga), - codec = codec, - ) - ) - - SagaState.ROLLBACK -> - Mono.just( - SagaRollbackEvent( - id = saga.id, - nodeName = saga.serverId, - group = saga.group, - event = extractEvent(saga), - cause = saga.cause - ?: throw NullPointerException("Null value on SagaRollbackEvent's cause field"), - codec = codec, - ) - ) - } - } - - private fun extractEvent(saga: Saga): String? { - return when (saga.event != null) { - true -> saga.event - false -> null - } + return Mono.just(saga.toEvent(codec)) } internal fun addOrchestrate(handler: Any) { @@ -145,6 +102,7 @@ internal abstract class AbstractSagaDispatcher( noRollbackFor, nextState, sagaManager, + abstractDeadLetterManager, ) ) }.onFailure { @@ -190,6 +148,7 @@ internal abstract class AbstractSagaDispatcher( noRollbackFor, nextState, sagaManager, + abstractDeadLetterManager, ) ) }.onFailure { @@ -225,6 +184,7 @@ internal abstract class AbstractSagaDispatcher( noRollbackFor, nextState, sagaManager, + abstractDeadLetterManager, ) ) }.onFailure { @@ -288,7 +248,8 @@ internal abstract class AbstractSagaDispatcher( messageId: String ): Mono> - private companion object { + internal companion object { + internal const val DEAD_LETTER = "dead letter" private const val DISPATCHED = "dispatched" private val notMatchedSagaHandlerException = diff --git a/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt index 6444c7b..8ad7e07 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt @@ -2,6 +2,8 @@ package org.rooftop.netx.engine import org.rooftop.netx.api.SagaEvent import org.rooftop.netx.api.SagaManager +import org.rooftop.netx.api.SagaRollbackEvent +import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager import org.rooftop.netx.engine.logging.info import reactor.core.publisher.Mono import kotlin.reflect.KClass @@ -20,6 +22,7 @@ internal class MonoDispatchFunction( noRetryFor: Array>, nextState: NextSagaState, sagaManager: SagaManager, + private val abstractDeadLetterManager: AbstractDeadLetterManager, ) : AbstractDispatchFunction>( eventType, function, @@ -33,7 +36,9 @@ internal class MonoDispatchFunction( return Mono.just(sagaEvent) .filter { isProcessable(sagaEvent) } .map { sagaEvent.copy() } - .flatMap { function.call(handler, sagaEvent) } + .flatMap { + function.call(handler, sagaEvent) + } .info("Call Mono SagaHandler \"${name()}\" with id \"${sagaEvent.id}\"") .switchIfEmpty(`continue`) .doOnNext { @@ -41,6 +46,13 @@ internal class MonoDispatchFunction( publishNextSaga(sagaEvent) } } + .onErrorResume { + if (sagaEvent is SagaRollbackEvent) { + abstractDeadLetterManager.addDeadLetter(sagaEvent) + } else { + Mono.error(it) + } + } .onErrorResume { if (isNoRollbackFor(it)) { return@onErrorResume noRollbackFor diff --git a/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt index 6a4163b..1bb19e1 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt @@ -2,8 +2,11 @@ package org.rooftop.netx.engine import org.rooftop.netx.api.SagaEvent import org.rooftop.netx.api.SagaManager +import org.rooftop.netx.api.SagaRollbackEvent +import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager import org.rooftop.netx.engine.logging.info import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers import kotlin.reflect.KClass import kotlin.reflect.KFunction @@ -18,6 +21,7 @@ internal class NotPublishDispatchFunction( noRollbackFor: Array>, nextState: NextSagaState, sagaManager: SagaManager, + private val abstractDeadLetterManager: AbstractDeadLetterManager, ) : AbstractDispatchFunction( eventType, function, @@ -30,11 +34,20 @@ internal class NotPublishDispatchFunction( override fun call(sagaEvent: SagaEvent): Any { if (isProcessable(sagaEvent)) { return runCatching { - function.call(handler, sagaEvent) info("Call NotPublisher SagaHandler \"${name()}\" with id \"${sagaEvent.id}\"") + val result = function.call(handler, sagaEvent) + info("Call NotPublisher SagaHandler success \\\"${name()}\\\" with id \\\"${sagaEvent.id}\\\"\"") + result }.fold( onSuccess = { publishNextSaga(sagaEvent) }, onFailure = { + if (sagaEvent is SagaRollbackEvent) { + abstractDeadLetterManager.addDeadLetter(sagaEvent) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe() + return@fold SKIP + } + if (isNoRollbackFor(it)) { return@fold NO_ROLLBACK_FOR } diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt index 77ae940..3e607c1 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt @@ -2,6 +2,8 @@ package org.rooftop.netx.engine import org.rooftop.netx.api.SagaEvent import org.rooftop.netx.api.SagaManager +import org.rooftop.netx.api.SagaRollbackEvent +import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager import org.rooftop.netx.engine.logging.info import reactor.core.publisher.Mono import kotlin.reflect.KClass @@ -20,6 +22,7 @@ internal class OrchestrateDispatchFunction( noRetryFor: Array>, nextState: NextSagaState, sagaManager: SagaManager, + private val abstractDeadLetterManager: AbstractDeadLetterManager, ) : AbstractDispatchFunction>( eventType, function, @@ -33,12 +36,20 @@ internal class OrchestrateDispatchFunction( return Mono.just(sagaEvent) .filter { isProcessable(sagaEvent) } .map { sagaEvent.copy() } - .flatMap { function.call(handler, sagaEvent) } .info("Call OrchestrateHandler \"${name()}\" with id \"${sagaEvent.id}\"") + .flatMap { function.call(handler, sagaEvent) } + .info("Call OrchestratorHandler success \"${name()}\" with id \"${sagaEvent.id}\"") .map { publishNextSaga(sagaEvent) it } + .onErrorResume { + if (sagaEvent is SagaRollbackEvent) { + abstractDeadLetterManager.addDeadLetter(sagaEvent.copy()) + } else { + Mono.error(it) + } + } .switchIfEmpty(`continue`) .onErrorResume { if (isNoRollbackFor(it)) { diff --git a/src/main/kotlin/org/rooftop/netx/engine/core/Saga.kt b/src/main/kotlin/org/rooftop/netx/engine/core/Saga.kt index 84c4c18..9f509a1 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/core/Saga.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/core/Saga.kt @@ -1,5 +1,8 @@ package org.rooftop.netx.engine.core +import org.rooftop.netx.api.* +import org.rooftop.netx.core.Codec + internal data class Saga( val id: String, val serverId: String, @@ -7,4 +10,67 @@ internal data class Saga( val state: SagaState, val cause: String? = null, val event: String? = null, -) +) { + + fun toEvent(codec: Codec): SagaEvent { + return when (state) { + SagaState.START -> SagaStartEvent( + id = id, + nodeName = serverId, + group = group, + event = extractEvent(), + codec = codec, + ) + + SagaState.COMMIT -> SagaCommitEvent( + id = id, + nodeName = serverId, + group = group, + event = extractEvent(), + codec = codec + ) + + SagaState.JOIN -> SagaJoinEvent( + id = id, + nodeName = serverId, + group = group, + event = extractEvent(), + codec = codec, + ) + + SagaState.ROLLBACK -> SagaRollbackEvent( + id = id, + nodeName = serverId, + group = group, + event = extractEvent(), + cause = cause + ?: throw NullPointerException("Null value on SagaRollbackEvent's cause field"), + codec = codec, + ) + } + } + + private fun extractEvent(): String? { + return when (event != null) { + true -> event + false -> null + } + } + + companion object { + internal fun of(sagaState: SagaState, sagaEvent: SagaEvent): Saga { + return Saga( + id = sagaEvent.id, + serverId = sagaEvent.nodeName, + group = sagaEvent.group, + state = sagaState, + cause = if (sagaState == SagaState.ROLLBACK) { + (sagaEvent as SagaRollbackEvent).cause + } else { + null + }, + event = sagaEvent.event + ) + } + } +} diff --git a/src/main/kotlin/org/rooftop/netx/engine/deadletter/AbstractDeadLetterManager.kt b/src/main/kotlin/org/rooftop/netx/engine/deadletter/AbstractDeadLetterManager.kt new file mode 100644 index 0000000..4c427cc --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/engine/deadletter/AbstractDeadLetterManager.kt @@ -0,0 +1,35 @@ +package org.rooftop.netx.engine.deadletter + +import org.rooftop.netx.api.DeadLetterRelay +import org.rooftop.netx.api.SagaEvent +import org.rooftop.netx.engine.AbstractSagaDispatcher +import org.rooftop.netx.engine.logging.error +import org.rooftop.netx.spi.DeadLetterListener +import org.rooftop.netx.spi.DeadLetterRegistry +import reactor.core.publisher.Mono + +internal abstract class AbstractDeadLetterManager : DeadLetterRelay, DeadLetterRegistry { + + private val deadLetterListeners: MutableList = mutableListOf() + + internal lateinit var dispatcher: AbstractSagaDispatcher + + override fun addListener(deadLetterListener: DeadLetterListener) { + deadLetterListeners.add(deadLetterListener) + } + + fun addDeadLetter(sagaEvent: SagaEvent): Mono { + return add(sagaEvent) + .doOnNext { + deadLetterListeners.forEach { deadLetterListener -> + runCatching { + deadLetterListener.listen(it, sagaEvent) + }.onFailure { + error("Fail to call deadLetterListener.listen", it) + } + } + } + } + + abstract fun add(sagaEvent: SagaEvent): Mono +} diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt index 7bbe2e4..860dd37 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt @@ -153,7 +153,7 @@ internal abstract class AbstractOrchestrateListener internal c OrchestrateEvent( orchestrateEvent.orchestratorId, beforeRollbackOrchestrateSequence, - "", + orchestrateEvent.clientEvent, orchestrateEvent.context, ) holdFailResult(id, throwable) diff --git a/src/main/kotlin/org/rooftop/netx/engine/logging/LoggingSupports.kt b/src/main/kotlin/org/rooftop/netx/engine/logging/LoggingSupports.kt index 6ef9550..4c202af 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/logging/LoggingSupports.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/logging/LoggingSupports.kt @@ -14,12 +14,16 @@ internal fun infoOnError(message: String, throwable: Throwable) = logger.info(me internal fun warningOnError(message: String) = logger.warn(message) -internal fun error(message: String) = logger.error(message) +internal fun error(message: String, throwable: Throwable? = null) = logger.error(message, throwable) internal fun Mono.info(message: String): Mono = this.doOnNext { logger.info(message) } +internal fun Mono.info(message: (T) -> String): Mono = this.doOnNext { + logger.info(message.invoke(it)) +} + internal fun Mono.infoOnError(message: String): Mono = this.doOnError { logger.info(message, it) } diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisDeadLetterManager.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisDeadLetterManager.kt new file mode 100644 index 0000000..2804028 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/redis/RedisDeadLetterManager.kt @@ -0,0 +1,99 @@ +package org.rooftop.netx.redis + +import org.rooftop.netx.api.DeadLetterException +import org.rooftop.netx.api.DeadLetterTimeoutException +import org.rooftop.netx.api.SagaEvent +import org.rooftop.netx.core.Codec +import org.rooftop.netx.engine.AbstractSagaDispatcher.Companion.DEAD_LETTER +import org.rooftop.netx.engine.core.Saga +import org.rooftop.netx.engine.core.SagaState +import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager +import org.rooftop.netx.engine.logging.error +import org.rooftop.netx.engine.logging.info +import org.springframework.data.domain.Range +import org.springframework.data.redis.connection.Limit +import org.springframework.data.redis.connection.stream.MapRecord +import org.springframework.data.redis.connection.stream.Record +import org.springframework.data.redis.core.ReactiveRedisTemplate +import reactor.core.publisher.Mono + +internal class RedisDeadLetterManager( + private val codec: Codec, + private val reactiveRedisTemplate: ReactiveRedisTemplate, +) : AbstractDeadLetterManager() { + + override fun relaySync(): SagaEvent { + return relay().block() ?: throw DeadLetterTimeoutException("Cannot get dead letter") + } + + override fun relay(): Mono { + return reactiveRedisTemplate.opsForStream() + .reverseRange(DEAD_LETTER_KEY, Range.unbounded(), Limit.limit().count(1)) + .single() + .doOnNext { + info("Success to read dead letter $it") + } + .dispatch() + } + + + override fun relay(deadLetterId: String): Mono { + return reactiveRedisTemplate.opsForStream() + .range(DEAD_LETTER_KEY, Range.just(deadLetterId)) + .single() + .doOnNext { + info("Success to read dead letter $it") + } + .dispatch() + } + + override fun relaySync(deadLetterId: String): SagaEvent { + return relay(deadLetterId).block() + ?: throw DeadLetterTimeoutException("Cannot get dead letter by deadLetterId: \"$deadLetterId\"") + } + + private fun Mono>.dispatch(): Mono { + return this.map { + it to codec.decode( + it.value[DATA] + ?: throw DeadLetterException("Cannot find any data from record $it"), + Saga::class, + ) + }.flatMap { + val record = it.first + val saga = it.second + dispatcher.dispatch(saga, DEAD_LETTER) + .map { record to saga.toEvent(codec) } + .info { "Success to dispatch dead letter. saga: \"$saga\"" } + }.flatMap { + val record = it.first + val saga = it.second + reactiveRedisTemplate.opsForStream() + .delete(DEAD_LETTER_KEY, record.id) + .map { saga } + .info { + "Success to delete dead letter. record: \"$record\"" + } + }.doOnError { + error("Fail to relay dead letter", it) + } + } + + override fun add(sagaEvent: SagaEvent): Mono { + return reactiveRedisTemplate.opsForStream() + .add( + Record.of( + mapOf(DATA to codec.encode(Saga.of(SagaState.ROLLBACK, sagaEvent))) + ).withStreamKey(DEAD_LETTER_KEY) + ) + .info("Success to add dead letter to \"$DEAD_LETTER_KEY\". event: \"$sagaEvent\"") + .doOnError { + error("Fail to add dead letter to \"$DEAD_LETTER_KEY\". event: \"$sagaEvent\"") + }.map { it.value } + } + + companion object { + private const val DATA = "data" + private const val DEAD_LETTER_KEY = "NETX_DEAD_LETTER" + } +} diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt index 02c4544..3072990 100644 --- a/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt +++ b/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt @@ -169,6 +169,7 @@ class RedisSagaConfigurer( nodeGroup = nodeGroup, codec = jsonCodec(), sagaManager = redisStreamSagaManager(), + abstractDeadLetterManager = redisDeadLetterManager(), ).also { info("RedisStreamSagaDispatcher connect to host : \"$host\" port : \"$port\" nodeName : \"$nodeName\" nodeGroup : \"$nodeGroup\"") } @@ -217,4 +218,14 @@ class RedisSagaConfigurer( return LettuceConnectionFactory(redisStandaloneConfiguration) } + + @Bean + @Primary + @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") + internal fun redisDeadLetterManager(): RedisDeadLetterManager { + return RedisDeadLetterManager( + codec = jsonCodec(), + reactiveRedisTemplate = sagaReactiveRedisTemplate(), + ) + } } diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamSagaDispatcher.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamSagaDispatcher.kt index 3799008..3880e37 100644 --- a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamSagaDispatcher.kt +++ b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamSagaDispatcher.kt @@ -5,6 +5,7 @@ import org.rooftop.netx.api.FailedAckSagaException import org.rooftop.netx.api.SagaManager import org.rooftop.netx.engine.AbstractSagaDispatcher import org.rooftop.netx.engine.core.Saga +import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager import org.rooftop.netx.meta.SagaHandler import org.springframework.context.ApplicationContext import org.springframework.data.redis.core.ReactiveRedisTemplate @@ -16,7 +17,8 @@ internal class RedisStreamSagaDispatcher( private val applicationContext: ApplicationContext, private val reactiveRedisTemplate: ReactiveRedisTemplate, private val nodeGroup: String, -) : AbstractSagaDispatcher(codec, sagaManager) { + abstractDeadLetterManager: AbstractDeadLetterManager, +) : AbstractSagaDispatcher(codec, sagaManager, abstractDeadLetterManager) { override fun findHandlers(): List { return applicationContext.getBeansWithAnnotation(SagaHandler::class.java) diff --git a/src/main/kotlin/org/rooftop/netx/spi/DeadLetterListener.kt b/src/main/kotlin/org/rooftop/netx/spi/DeadLetterListener.kt new file mode 100644 index 0000000..43b6ce0 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/spi/DeadLetterListener.kt @@ -0,0 +1,19 @@ +package org.rooftop.netx.spi + +import org.rooftop.netx.api.SagaEvent + +/** + * By implementing this class, you can listen for dead letter occurrence events. + * + * @see DeadLetterRegistry.addListener + */ +fun interface DeadLetterListener { + + /** + * handle dead letter add event + * + * @param deadLetterId | DeadLetter Id + * @param sagaEvent | DeadLetter + */ + fun listen(deadLetterId: String, sagaEvent: SagaEvent) +} diff --git a/src/main/kotlin/org/rooftop/netx/spi/DeadLetterRegistry.kt b/src/main/kotlin/org/rooftop/netx/spi/DeadLetterRegistry.kt new file mode 100644 index 0000000..8e2a256 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/spi/DeadLetterRegistry.kt @@ -0,0 +1,16 @@ +package org.rooftop.netx.spi + +/** + * An interface for registry dead letter listener. + * + * @see DeadLetterListener + */ +fun interface DeadLetterRegistry { + + /** + * Adding a hook to be executed when a dead letter occurs. + * + * @see DeadLetterListener + */ + fun addListener(deadLetterListener: DeadLetterListener) +} diff --git a/src/test/kotlin/org/rooftop/netx/engine/DeadLetterAnnotationClass.kt b/src/test/kotlin/org/rooftop/netx/engine/DeadLetterAnnotationClass.kt new file mode 100644 index 0000000..73db2c5 --- /dev/null +++ b/src/test/kotlin/org/rooftop/netx/engine/DeadLetterAnnotationClass.kt @@ -0,0 +1,33 @@ +package org.rooftop.netx.engine + +import org.rooftop.netx.api.SagaRollbackEvent +import org.rooftop.netx.api.SagaRollbackListener +import org.rooftop.netx.api.SagaStartEvent +import org.rooftop.netx.api.SagaStartListener +import org.rooftop.netx.meta.SagaHandler + +@SagaHandler +class DeadLetterAnnotationClass { + + lateinit var errorInjector: ErrorInjector + lateinit var relayResultHolder: RelayResultHolder + + + @SagaStartListener(event = RelayEvent::class) + fun sagaStartListener(sagaStartEvent: SagaStartEvent) { + val relayEvent = sagaStartEvent.decodeEvent(RelayEvent::class) + sagaStartEvent.setNextEvent(relayEvent) + + throw IllegalArgumentException("Rollback on start") + } + + @SagaRollbackListener(event = RelayEvent::class) + fun sagaRollbackListener(sagaRollbackEvent: SagaRollbackEvent) { + val relayEvent = sagaRollbackEvent.decodeEvent(RelayEvent::class) + if (errorInjector.doError) { + throw IllegalStateException("Rollback on start") + } + + relayResultHolder.hold("DeadLetterAnnotationClass", relayEvent) + } +} diff --git a/src/test/kotlin/org/rooftop/netx/engine/DeadLetterConfigurer.kt b/src/test/kotlin/org/rooftop/netx/engine/DeadLetterConfigurer.kt new file mode 100644 index 0000000..ef9a4d6 --- /dev/null +++ b/src/test/kotlin/org/rooftop/netx/engine/DeadLetterConfigurer.kt @@ -0,0 +1,109 @@ +package org.rooftop.netx.engine + +import org.rooftop.netx.api.Orchestrator +import org.springframework.boot.test.context.TestConfiguration +import org.springframework.context.annotation.Bean + +class ErrorInjector { + var doError: Boolean = false +} + +class RelayResultHolder { + + private val holder: MutableMap> = mutableMapOf() + + operator fun get(key: String) = holder[key] + + fun hold(key: String, value: RelayEvent) { + holder.putIfAbsent(key, mutableListOf()) + holder[key]?.add(value) + } + + fun clear() { + holder.clear() + } +} + +data class RelayEvent( + val id: Long, + val name: String, + val list: List, + val depth: Depth, +) { + + data class Depth( + val success: Boolean, + ) +} + +@TestConfiguration +internal class DeadLetterConfigurer( + private val orchestratorFactory: OrchestratorFactory, + private val deadLetterAnnotationClass: DeadLetterAnnotationClass, +) { + + @Bean + fun errorInjector() = ErrorInjector().apply { + deadLetterAnnotationClass.errorInjector = this + } + + @Bean + fun relayResultHolder() = RelayResultHolder().apply { + deadLetterAnnotationClass.relayResultHolder = this + } + + @Bean(name = ["relay1Depths"]) + fun relay1Depths( + relayResultHolder: RelayResultHolder, + errorInjector: ErrorInjector, + ): Orchestrator { + return orchestratorFactory.create("relay1Depths") + .start( + orchestrate = { + it + }, + rollback = { + if (errorInjector.doError) { + throw IllegalStateException("Error on rollback") + } + relayResultHolder.hold("relay1Depths", it) + } + ) + .commit { + throw IllegalStateException("") + } + } + + @Bean(name = ["relay2Depths"]) + fun relay2Depths( + relayResultHolder: RelayResultHolder, + errorInjector: ErrorInjector, + ): Orchestrator { + return orchestratorFactory.create("relay2Depths") + .start( + orchestrate = { + it + }, + rollback = { + if (errorInjector.doError) { + throw IllegalStateException("Error on rollback") + } + relayResultHolder.hold("relay2Depths", it) + } + ) + .joinWithContext( + contextOrchestrate = { _, request -> + request + }, + contextRollback = { _, request -> + if (errorInjector.doError) { + throw IllegalStateException("Error on rollback") + } + relayResultHolder.hold("relay2Depths", request) + } + ) + .commit { + throw IllegalStateException("") + } + } +} diff --git a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt index 2d8ed38..f4d6ca2 100644 --- a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt +++ b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt @@ -329,6 +329,65 @@ internal class OrchestratorConfigurer( .commitWithContext({ _, _ -> "" }) } + @Bean(name = ["whenErrorOccurredRollbackThenAddDeadLetterOrchestrator"]) + fun whenErrorOccurredRollbackThenAddDeadLetterOrchestrator(): Orchestrator { + return OrchestratorFactory.instance() + .create("whenErrorOccurredRollbackThenAddDeadLetterOrchestrator") + .start( + orchestrate = { "" }, + rollback = { + throw IllegalStateException("Add dead letter") + } + ) + .commit( + orchestrate = { + throw IllegalStateException("Throw error") + } + ) + } + + @Bean(name = ["whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator"]) + fun whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator(): Orchestrator { + return OrchestratorFactory.instance() + .create("whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator") + .startWithContext( + contextOrchestrate = { _, _ -> "" }, + contextRollback = { _, _ -> + throw IllegalStateException("Add dead letter") + } + ) + .joinWithContext( + contextOrchestrate = { _, _ -> "" }, + contextRollback = { _, _ -> + throw IllegalStateException("Add dead letter") + } + ) + .commitWithContext { _, _ -> + throw IllegalStateException("Throw error") + } + } + + @Bean(name = ["whenErrorOccurredRollbackThenAddDeadLetterReactiveOrchestrator"]) + fun whenErrorOccurredRollbackThenAddDeadLetterReactiveOrchestrator(): Orchestrator { + return OrchestratorFactory.instance() + .create("whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator") + .startReactive( + orchestrate = { Mono.just("") }, + rollback = { + throw IllegalStateException("Add dead letter") + } + ) + .joinReactive( + orchestrate = { Mono.just("") }, + rollback = { + throw IllegalStateException("Add dead letter") + } + ) + .commitReactive { + throw IllegalStateException("Throw error") + } + } + fun interface ListOrchestrate : ContextOrchestrate, List> { diff --git a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt index a0faa90..b9ae596 100644 --- a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt +++ b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt @@ -8,15 +8,14 @@ import io.kotest.core.annotation.DisplayName import io.kotest.core.spec.style.DescribeSpec import io.kotest.matchers.equality.shouldBeEqualToComparingFields import io.kotest.matchers.equals.shouldBeEqual -import org.rooftop.netx.api.Orchestrator -import org.rooftop.netx.api.ResultException -import org.rooftop.netx.api.TypeReference +import io.kotest.matchers.shouldNotBe +import org.rooftop.netx.api.* import org.rooftop.netx.meta.EnableSaga import org.rooftop.netx.redis.RedisContainer +import org.rooftop.netx.spi.DeadLetterRegistry import org.springframework.beans.factory.annotation.Qualifier import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource -import org.springframework.web.client.HttpClientErrorException import java.time.Instant import kotlin.time.Duration.Companion.seconds @@ -34,6 +33,7 @@ internal class OrchestratorTest( private val homeOrchestrator: Orchestrator, private val instantOrchestrator: Orchestrator, private val manyTypeOrchestrator: Orchestrator, + private val deadLetterRegistry: DeadLetterRegistry, @Qualifier("rollbackOrchestrator") private val rollbackOrchestrator: Orchestrator, @Qualifier("upChainRollbackOrchestrator") private val upChainRollbackOrchestrator: Orchestrator, @Qualifier("monoRollbackOrchestrator") private val monoRollbackOrchestrator: Orchestrator, @@ -49,6 +49,9 @@ internal class OrchestratorTest( @Qualifier("throwOnCommitWithContextOrchestrator") private val throwOnCommitWithContextOrchestrator: Orchestrator, List>, @Qualifier("throwJwtExceptionOnStartOrchestrator") private val throwJwtExceptionOnStartOrchestrator: Orchestrator, @Qualifier("throwHttpClientErrorExceptionOnStartOrchestrator") private val throwHttpClientErrorExceptionOnStartOrchestrator: Orchestrator, + @Qualifier("whenErrorOccurredRollbackThenAddDeadLetterOrchestrator") private val whenErrorOccurredRollbackThenAddDeadLetterOrchestrator: Orchestrator, + @Qualifier("whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator") private val whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator: Orchestrator, + @Qualifier("whenErrorOccurredRollbackThenAddDeadLetterReactiveOrchestrator") private val whenErrorOccurredRollbackThenAddDeadLetterReactiveOrchestrator: Orchestrator, ) : DescribeSpec({ describe("numberOrchestrator 구현채는") { @@ -312,6 +315,57 @@ internal class OrchestratorTest( } } } + + describe("whenErrorOccurredRollbackThenAddDeadLetterOrchestrator 구현채는") { + context("롤백 파이프라인에서 에러가 던져지면,") { + var result: SagaEvent? = null + deadLetterRegistry.addListener { _, event -> + result = event + } + + it("DeadLetterQueue 에 메시지를 저장한다") { + whenErrorOccurredRollbackThenAddDeadLetterOrchestrator.sagaSync("") + + eventually(5.seconds) { + result shouldNotBe null + } + } + } + } + + describe("whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator 구현채는") { + context("롤백 파이프라인에서 에러가 던져지면,") { + var result: SagaEvent? = null + deadLetterRegistry.addListener { _, event -> + result = event + } + + it("DeadLetterQueue 에 메시지를 저장한다") { + whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator.sagaSync("") + + eventually(5.seconds) { + result shouldNotBe null + } + } + } + } + + describe("whenErrorOccurredRollbackThenAddDeadLetterReactiveOrchestrator 구현채는") { + context("롤백 파이프라인에서 에러가 던져지면,") { + var result: SagaEvent? = null + deadLetterRegistry.addListener { _, event -> + result = event + } + + it("DeadLetterQueue 에 메시지를 저장한다") { + whenErrorOccurredRollbackThenAddDeadLetterReactiveOrchestrator.sagaSync("") + + eventually(5.seconds) { + result shouldNotBe null + } + } + } + } }) { data class Home( val address: String, diff --git a/src/test/kotlin/org/rooftop/netx/redis/RedisDeadLetterManagerTest.kt b/src/test/kotlin/org/rooftop/netx/redis/RedisDeadLetterManagerTest.kt new file mode 100644 index 0000000..b56fc3e --- /dev/null +++ b/src/test/kotlin/org/rooftop/netx/redis/RedisDeadLetterManagerTest.kt @@ -0,0 +1,153 @@ +package org.rooftop.netx.redis + +import io.kotest.assertions.nondeterministic.eventually +import io.kotest.core.annotation.DisplayName +import io.kotest.core.spec.style.DescribeSpec +import io.kotest.matchers.booleans.shouldBeTrue +import io.kotest.matchers.shouldBe +import org.rooftop.netx.api.DeadLetterRelay +import org.rooftop.netx.api.Orchestrator +import org.rooftop.netx.api.SagaManager +import org.rooftop.netx.engine.* +import org.rooftop.netx.meta.EnableSaga +import org.rooftop.netx.spi.DeadLetterRegistry +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import kotlin.time.Duration.Companion.seconds + + +@EnableSaga +@ContextConfiguration( + classes = [ + RedisContainer::class, + DeadLetterConfigurer::class, + DeadLetterAnnotationClass::class, + ] +) +@DisplayName("DeadLetterRelay 클래스의") +@TestPropertySource("classpath:application.properties") +internal class RedisDeadLetterManagerTest( + private val sagaManager: SagaManager, + private val errorInjector: ErrorInjector, + private val relayResultHolder: RelayResultHolder, + private val deadLetterRelay: DeadLetterRelay, + private val deadLetterRegistry: DeadLetterRegistry, + @Qualifier("relay1Depths") private val relay1Depths: Orchestrator, + @Qualifier("relay2Depths") private val relay2Depths: Orchestrator, +) : DescribeSpec({ + + beforeEach { + errorInjector.doError = true + relayResultHolder.clear() + } + + describe("relay1Depths 구현채는") { + context("rollback중 예외가 발생시, relay를 호출하면") { + val relayEvent = RelayEvent(1, "1", listOf("1", "1"), RelayEvent.Depth(true)) + + var isSuccessToAddDeadLetter = false + deadLetterRegistry.addListener { _, _ -> + isSuccessToAddDeadLetter = true + } + + it("실패한 곳 부터 재시도 한다") { + relay1Depths.sagaSync(relayEvent) + + eventually(5.seconds) { + isSuccessToAddDeadLetter.shouldBeTrue() + } + + errorInjector.doError = false + + deadLetterRelay.relaySync() + + eventually(5.seconds) { + relayResultHolder["relay1Depths"] shouldBe listOf(relayEvent) + } + } + } + } + + describe("relay2Depths 구현채는") { + context("rollback중 예외가 발생시, relay를 호출하면") { + val relayEvent = RelayEvent(2, "2", listOf("2", "2", "2"), RelayEvent.Depth(true)) + + var isSuccessToAddDeadLetter = false + deadLetterRegistry.addListener { _, _ -> + isSuccessToAddDeadLetter = true + } + + it("실패한 곳 부터 재시도 한다") { + relay2Depths.sagaSync(relayEvent) + + eventually(5.seconds) { + isSuccessToAddDeadLetter.shouldBeTrue() + } + + errorInjector.doError = false + deadLetterRelay.relaySync() + + eventually(5.seconds) { + relayResultHolder["relay2Depths"] shouldBe listOf(relayEvent, relayEvent) + } + } + } + } + + describe("DeadLetterAnnotationClass 구현채는") { + context("SagaRollbackListener에서 에러가 발생시 relay를 호출하면") { + val relayEvent = RelayEvent(3, "3", listOf("3", "3"), RelayEvent.Depth(true)) + + var isSuccessToAddDeadLetter = false + deadLetterRegistry.addListener { _, _ -> + isSuccessToAddDeadLetter = true + } + + it("실패한 RollbackListener를 재시도한다") { + sagaManager.startSync(relayEvent) + + eventually(5.seconds) { + isSuccessToAddDeadLetter.shouldBeTrue() + } + + errorInjector.doError = false + deadLetterRelay.relaySync() + + eventually(5.seconds) { + relayResultHolder["DeadLetterAnnotationClass"] shouldBe listOf(relayEvent) + } + } + } + } + + describe("relay(deadLetterId: String) 메소드는") { + context("특정 deadLetterId를 입력받으면,") { + val relay1Event = RelayEvent(1, "1", listOf("1"), RelayEvent.Depth(false)) + val relay2Event = RelayEvent(2, "2", listOf("2"), RelayEvent.Depth(false)) + + val deadLetterIds: MutableMap = mutableMapOf() + deadLetterRegistry.addListener { deadLetterId, sagaEvent -> + val relayEvent = sagaEvent.decodeOrchestrateEvent(RelayEvent::class) + deadLetterIds[relayEvent.id] = deadLetterId + } + + it("해당하는 deadLetter 를 retry한다.") { + relay1Depths.sagaSync(relay1Event) + relay2Depths.sagaSync(relay2Event) + + eventually(5.seconds) { + deadLetterIds.size shouldBe 2 + } + + errorInjector.doError = false + deadLetterRelay.relaySync(deadLetterIds[1]!!) + + eventually(5.seconds) { + relayResultHolder["relay1Depths"] shouldBe listOf(relay1Event) + relayResultHolder["relay2Depths"] shouldBe null + } + } + } + } +}) From b06a697bf957ba8550afb0215e9020a45fa3a292 Mon Sep 17 00:00:00 2001 From: devxb Date: Sun, 6 Apr 2025 16:52:32 +0900 Subject: [PATCH 2/3] docs: add dead letter description --- README.md | 67 ++++++++++++++++++++++++++++++++++++++++++++--- gradle.properties | 2 +- 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 6fd94cc..b15d791 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@
-![version 0.4.8](https://img.shields.io/badge/version-0.4.8-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square) +![version 0.4.9](https://img.shields.io/badge/version-0.4.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square) ![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white) **TPS(6,000)** on my Macbook air m2(default options). _[link](#Test1-TPS)_ @@ -16,12 +16,28 @@ Netx is a Saga framework, that provides following features. 3. Supports both Orchestration and Choreograph. 4. Automatically reruns loss events. 5. Automatically applies **`Transactional messaging pattern`**. -6. Supports backpressure to control the number of events that can be processed per node. -7. Prevents multiple nodes in the same group from receiving duplicate events. -8. Ensures message delivery using the `At Least Once` approach. +6. Supports **`Rollback Dead letter`** relay. If an exception occurs during the rollback process, saga is store to the Dead Letter Queue, and you can relay it. by Using DeadLetterRelay +7. Supports backpressure to control the number of events that can be processed per node. +8. Prevents multiple nodes in the same group from receiving duplicate events. +9. Ensures message delivery using the `At Least Once` approach. You can see the test results [here](#Test). +## Table of Contents +- [Download](#download) +- [How to use](#how-to-use) + - [Orchestrator-example.](#orchestrator-example) + - [Events-Example. Handle saga event](#events-example-handle-saga-event) + - [Events-Example. Start pay saga](#events-example-start-pay-saga) + - [Events-Example. Join order saga](#events-example-join-order-saga) + - [Events-Example. Check exists saga](#events-example-check-exists-saga) +- [Rollback DeadLetter](#rollback-deadletter) + - [Example. relay deadLetter](#example-relay-deadletter) + - [Example. handle deadLetter message](#example-handle-deadletter-message) +- [Test](#test) + - [Test1-TPS](#test1-tps) + - [Test2-Rollback](#test2-rollback) + ## Download ```groovy @@ -274,6 +290,49 @@ fun exists(param: Any): Mono { } ``` +### Rollback DeadLetter + +#### Example. relay deadLetter + +```kotlin + +@Component +class SomeClass( + private val deadLetterRelay: DeadLetterRelay, +) { + + fun example() { + // Relay latest dead letter + deadLetterRelay.relay() + .subscribe() + + // Alternatively, you can use the …Sync method in a synchronous environment. + deadLetterRelay.relaySync() + + // Relay specific dead letter by deadLetterId + deadLetterRelay.relaySync("12345-01") + } +} + +``` + +#### Example. handle deadLetter message + +```kotlin + +@Configuration +class SomeClass( + private val deadLetterRegistry: DeadLetterRegistry, +) { + + fun example() { + deadLetterRegistry.addListener { deadLetterId, sagaEvent -> + // do handle + } + } +} +``` + ## Test ### Test1-TPS diff --git a/gradle.properties b/gradle.properties index 7a57e5f..be4c05b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,7 +2,7 @@ kotlin.code.style=official ### Project ### group=org.rooftopmsa -version=0.4.8 +version=0.4.9 compatibility=17 ### Sonarcloud ### From 1dea5d2c035a8206a94314fb19c37a174b99171f Mon Sep 17 00:00:00 2001 From: xb205 <62425964+devxb@users.noreply.github.com> Date: Sun, 6 Apr 2025 16:54:18 +0900 Subject: [PATCH 3/3] Update README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b15d791..8809c11 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Netx is a Saga framework, that provides following features. 3. Supports both Orchestration and Choreograph. 4. Automatically reruns loss events. 5. Automatically applies **`Transactional messaging pattern`**. -6. Supports **`Rollback Dead letter`** relay. If an exception occurs during the rollback process, saga is store to the Dead Letter Queue, and you can relay it. by Using DeadLetterRelay +6. Supports **`Rollback Dead letter`** relay. If an exception occurs during the rollback process, saga is stored in the Dead Letter Queue, and you can relay it using DeadLetterRelay. 7. Supports backpressure to control the number of events that can be processed per node. 8. Prevents multiple nodes in the same group from receiving duplicate events. 9. Ensures message delivery using the `At Least Once` approach.