diff --git a/README.md b/README.md index 6fd94cc..8809c11 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@
-![version 0.4.8](https://img.shields.io/badge/version-0.4.8-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square) +![version 0.4.9](https://img.shields.io/badge/version-0.4.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square) ![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white) **TPS(6,000)** on my Macbook air m2(default options). _[link](#Test1-TPS)_ @@ -16,12 +16,28 @@ Netx is a Saga framework, that provides following features. 3. Supports both Orchestration and Choreograph. 4. Automatically reruns loss events. 5. Automatically applies **`Transactional messaging pattern`**. -6. Supports backpressure to control the number of events that can be processed per node. -7. Prevents multiple nodes in the same group from receiving duplicate events. -8. Ensures message delivery using the `At Least Once` approach. +6. Supports **`Rollback Dead letter`** relay. If an exception occurs during the rollback process, saga is stored in the Dead Letter Queue, and you can relay it using DeadLetterRelay. +7. Supports backpressure to control the number of events that can be processed per node. +8. Prevents multiple nodes in the same group from receiving duplicate events. +9. Ensures message delivery using the `At Least Once` approach. You can see the test results [here](#Test). +## Table of Contents +- [Download](#download) +- [How to use](#how-to-use) + - [Orchestrator-example.](#orchestrator-example) + - [Events-Example. Handle saga event](#events-example-handle-saga-event) + - [Events-Example. Start pay saga](#events-example-start-pay-saga) + - [Events-Example. Join order saga](#events-example-join-order-saga) + - [Events-Example. Check exists saga](#events-example-check-exists-saga) +- [Rollback DeadLetter](#rollback-deadletter) + - [Example. relay deadLetter](#example-relay-deadletter) + - [Example. handle deadLetter message](#example-handle-deadletter-message) +- [Test](#test) + - [Test1-TPS](#test1-tps) + - [Test2-Rollback](#test2-rollback) + ## Download ```groovy @@ -274,6 +290,49 @@ fun exists(param: Any): Mono { } ``` +### Rollback DeadLetter + +#### Example. relay deadLetter + +```kotlin + +@Component +class SomeClass( + private val deadLetterRelay: DeadLetterRelay, +) { + + fun example() { + // Relay latest dead letter + deadLetterRelay.relay() + .subscribe() + + // Alternatively, you can use the …Sync method in a synchronous environment. + deadLetterRelay.relaySync() + + // Relay specific dead letter by deadLetterId + deadLetterRelay.relaySync("12345-01") + } +} + +``` + +#### Example. handle deadLetter message + +```kotlin + +@Configuration +class SomeClass( + private val deadLetterRegistry: DeadLetterRegistry, +) { + + fun example() { + deadLetterRegistry.addListener { deadLetterId, sagaEvent -> + // do handle + } + } +} +``` + ## Test ### Test1-TPS diff --git a/gradle.properties b/gradle.properties index 7a57e5f..be4c05b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,7 +2,7 @@ kotlin.code.style=official ### Project ### group=org.rooftopmsa -version=0.4.8 +version=0.4.9 compatibility=17 ### Sonarcloud ### diff --git a/src/main/kotlin/org/rooftop/netx/api/DeadLetterRelay.kt b/src/main/kotlin/org/rooftop/netx/api/DeadLetterRelay.kt new file mode 100644 index 0000000..3486699 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/api/DeadLetterRelay.kt @@ -0,0 +1,37 @@ +package org.rooftop.netx.api + +import reactor.core.publisher.Mono + +/** + * An interface for relay dead letters, with support for recovering those that failed during rollback. + */ +interface DeadLetterRelay { + + /** + * relay dead letter and return SagaEvent + * + * @return SagaEvent | A dead letter that was successfully processed + */ + fun relay(): Mono + + /** + * @see relay + * @return SagaEvent | A dead letter that was successfully processed + */ + fun relaySync(): SagaEvent + + + /** + * relay dead letter and return SagaEvent by specific deadLetterId (not a same SagaEvent.id) + * + * @param deadLetterId + * @return SagaEvent | A dead letter that was successfully processed + */ + fun relay(deadLetterId: String): Mono + + /** + * @see relay + * @return SagaEvent | A dead letter that was successfully processed (not a same SagaEvent.id) + */ + fun relaySync(deadLetterId: String): SagaEvent +} diff --git a/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt b/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt index c29cf7a..2eb9b7b 100644 --- a/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt +++ b/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt @@ -18,5 +18,9 @@ class FailedAckSagaException(message: String) : RuntimeException(message) class ResultTimeoutException(message: String, throwable: Throwable) : RuntimeException(message, throwable) +class DeadLetterTimeoutException(message: String): RuntimeException(message) + +class DeadLetterException(message: String): RuntimeException(message) + @JsonIgnoreProperties(ignoreUnknown = true) class ResultException(message: String) : RuntimeException(message) diff --git a/src/main/kotlin/org/rooftop/netx/api/SagaEvent.kt b/src/main/kotlin/org/rooftop/netx/api/SagaEvent.kt index e07a2d7..0399b87 100644 --- a/src/main/kotlin/org/rooftop/netx/api/SagaEvent.kt +++ b/src/main/kotlin/org/rooftop/netx/api/SagaEvent.kt @@ -84,5 +84,28 @@ sealed class SagaEvent( type ) + /** + * If you use Orchestrator and get SagaEvent that hold orchestrateEvent. + * you can get type class by using this function. + * + * @param type + * @param T + */ + fun decodeOrchestrateEvent(type: Class): T = decodeOrchestrateEvent(type.kotlin) + + /** + * @see decodeOrchestrateEvent + */ + fun decodeOrchestrateEvent(type: KClass): T { + val orchestrateEvent = codec.decode( + event ?: throw NullPointerException("Cannot decode event cause event is null"), + object : TypeReference>() {} + ) + val clientEvent = orchestrateEvent["clientEvent"] + ?: throw NullPointerException("Cannot decode event cause orchestrateEvent.clientEvent is null") + + return codec.decode(clientEvent as String, type) + } + internal abstract fun copy(): SagaEvent } diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaDispatcher.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaDispatcher.kt index b383ccb..1245f9f 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaDispatcher.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaDispatcher.kt @@ -5,6 +5,7 @@ import org.rooftop.netx.api.* import org.rooftop.netx.core.Codec import org.rooftop.netx.engine.core.Saga import org.rooftop.netx.engine.core.SagaState +import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager import org.rooftop.netx.engine.logging.info import org.rooftop.netx.engine.logging.warningOnError import reactor.core.publisher.Flux @@ -17,8 +18,13 @@ import kotlin.reflect.full.declaredMemberFunctions internal abstract class AbstractSagaDispatcher( private val codec: Codec, private val sagaManager: SagaManager, + private val abstractDeadLetterManager: AbstractDeadLetterManager, ) { + init { + this.also { abstractDeadLetterManager.dispatcher = it } + } + private val functions = mutableMapOf>>() @@ -55,7 +61,8 @@ internal abstract class AbstractSagaDispatcher( messageId: String ): Flux<*> = this.doOnComplete { Mono.just(saga to messageId) - .info("Ack saga \"${saga.id}\"") + .filter { messageId != DEAD_LETTER } + .info("Ack saga \"${saga.id}\" messageId: $messageId") .flatMap { ack(saga, messageId) .warningOnError("Fail to ack saga \"${saga.id}\"") @@ -65,57 +72,7 @@ internal abstract class AbstractSagaDispatcher( } private fun mapSagaEvent(saga: Saga): Mono { - return when (saga.state) { - SagaState.START -> Mono.just( - SagaStartEvent( - id = saga.id, - nodeName = saga.serverId, - group = saga.group, - event = extractEvent(saga), - codec = codec, - ) - ) - - SagaState.COMMIT -> Mono.just( - SagaCommitEvent( - id = saga.id, - nodeName = saga.serverId, - group = saga.group, - event = extractEvent(saga), - codec = codec - ) - ) - - SagaState.JOIN -> Mono.just( - SagaJoinEvent( - id = saga.id, - nodeName = saga.serverId, - group = saga.group, - event = extractEvent(saga), - codec = codec, - ) - ) - - SagaState.ROLLBACK -> - Mono.just( - SagaRollbackEvent( - id = saga.id, - nodeName = saga.serverId, - group = saga.group, - event = extractEvent(saga), - cause = saga.cause - ?: throw NullPointerException("Null value on SagaRollbackEvent's cause field"), - codec = codec, - ) - ) - } - } - - private fun extractEvent(saga: Saga): String? { - return when (saga.event != null) { - true -> saga.event - false -> null - } + return Mono.just(saga.toEvent(codec)) } internal fun addOrchestrate(handler: Any) { @@ -145,6 +102,7 @@ internal abstract class AbstractSagaDispatcher( noRollbackFor, nextState, sagaManager, + abstractDeadLetterManager, ) ) }.onFailure { @@ -190,6 +148,7 @@ internal abstract class AbstractSagaDispatcher( noRollbackFor, nextState, sagaManager, + abstractDeadLetterManager, ) ) }.onFailure { @@ -225,6 +184,7 @@ internal abstract class AbstractSagaDispatcher( noRollbackFor, nextState, sagaManager, + abstractDeadLetterManager, ) ) }.onFailure { @@ -288,7 +248,8 @@ internal abstract class AbstractSagaDispatcher( messageId: String ): Mono> - private companion object { + internal companion object { + internal const val DEAD_LETTER = "dead letter" private const val DISPATCHED = "dispatched" private val notMatchedSagaHandlerException = diff --git a/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt index 6444c7b..8ad7e07 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt @@ -2,6 +2,8 @@ package org.rooftop.netx.engine import org.rooftop.netx.api.SagaEvent import org.rooftop.netx.api.SagaManager +import org.rooftop.netx.api.SagaRollbackEvent +import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager import org.rooftop.netx.engine.logging.info import reactor.core.publisher.Mono import kotlin.reflect.KClass @@ -20,6 +22,7 @@ internal class MonoDispatchFunction( noRetryFor: Array>, nextState: NextSagaState, sagaManager: SagaManager, + private val abstractDeadLetterManager: AbstractDeadLetterManager, ) : AbstractDispatchFunction>( eventType, function, @@ -33,7 +36,9 @@ internal class MonoDispatchFunction( return Mono.just(sagaEvent) .filter { isProcessable(sagaEvent) } .map { sagaEvent.copy() } - .flatMap { function.call(handler, sagaEvent) } + .flatMap { + function.call(handler, sagaEvent) + } .info("Call Mono SagaHandler \"${name()}\" with id \"${sagaEvent.id}\"") .switchIfEmpty(`continue`) .doOnNext { @@ -41,6 +46,13 @@ internal class MonoDispatchFunction( publishNextSaga(sagaEvent) } } + .onErrorResume { + if (sagaEvent is SagaRollbackEvent) { + abstractDeadLetterManager.addDeadLetter(sagaEvent) + } else { + Mono.error(it) + } + } .onErrorResume { if (isNoRollbackFor(it)) { return@onErrorResume noRollbackFor diff --git a/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt index 6a4163b..1bb19e1 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt @@ -2,8 +2,11 @@ package org.rooftop.netx.engine import org.rooftop.netx.api.SagaEvent import org.rooftop.netx.api.SagaManager +import org.rooftop.netx.api.SagaRollbackEvent +import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager import org.rooftop.netx.engine.logging.info import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers import kotlin.reflect.KClass import kotlin.reflect.KFunction @@ -18,6 +21,7 @@ internal class NotPublishDispatchFunction( noRollbackFor: Array>, nextState: NextSagaState, sagaManager: SagaManager, + private val abstractDeadLetterManager: AbstractDeadLetterManager, ) : AbstractDispatchFunction( eventType, function, @@ -30,11 +34,20 @@ internal class NotPublishDispatchFunction( override fun call(sagaEvent: SagaEvent): Any { if (isProcessable(sagaEvent)) { return runCatching { - function.call(handler, sagaEvent) info("Call NotPublisher SagaHandler \"${name()}\" with id \"${sagaEvent.id}\"") + val result = function.call(handler, sagaEvent) + info("Call NotPublisher SagaHandler success \\\"${name()}\\\" with id \\\"${sagaEvent.id}\\\"\"") + result }.fold( onSuccess = { publishNextSaga(sagaEvent) }, onFailure = { + if (sagaEvent is SagaRollbackEvent) { + abstractDeadLetterManager.addDeadLetter(sagaEvent) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe() + return@fold SKIP + } + if (isNoRollbackFor(it)) { return@fold NO_ROLLBACK_FOR } diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt index 77ae940..3e607c1 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt @@ -2,6 +2,8 @@ package org.rooftop.netx.engine import org.rooftop.netx.api.SagaEvent import org.rooftop.netx.api.SagaManager +import org.rooftop.netx.api.SagaRollbackEvent +import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager import org.rooftop.netx.engine.logging.info import reactor.core.publisher.Mono import kotlin.reflect.KClass @@ -20,6 +22,7 @@ internal class OrchestrateDispatchFunction( noRetryFor: Array>, nextState: NextSagaState, sagaManager: SagaManager, + private val abstractDeadLetterManager: AbstractDeadLetterManager, ) : AbstractDispatchFunction>( eventType, function, @@ -33,12 +36,20 @@ internal class OrchestrateDispatchFunction( return Mono.just(sagaEvent) .filter { isProcessable(sagaEvent) } .map { sagaEvent.copy() } - .flatMap { function.call(handler, sagaEvent) } .info("Call OrchestrateHandler \"${name()}\" with id \"${sagaEvent.id}\"") + .flatMap { function.call(handler, sagaEvent) } + .info("Call OrchestratorHandler success \"${name()}\" with id \"${sagaEvent.id}\"") .map { publishNextSaga(sagaEvent) it } + .onErrorResume { + if (sagaEvent is SagaRollbackEvent) { + abstractDeadLetterManager.addDeadLetter(sagaEvent.copy()) + } else { + Mono.error(it) + } + } .switchIfEmpty(`continue`) .onErrorResume { if (isNoRollbackFor(it)) { diff --git a/src/main/kotlin/org/rooftop/netx/engine/core/Saga.kt b/src/main/kotlin/org/rooftop/netx/engine/core/Saga.kt index 84c4c18..9f509a1 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/core/Saga.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/core/Saga.kt @@ -1,5 +1,8 @@ package org.rooftop.netx.engine.core +import org.rooftop.netx.api.* +import org.rooftop.netx.core.Codec + internal data class Saga( val id: String, val serverId: String, @@ -7,4 +10,67 @@ internal data class Saga( val state: SagaState, val cause: String? = null, val event: String? = null, -) +) { + + fun toEvent(codec: Codec): SagaEvent { + return when (state) { + SagaState.START -> SagaStartEvent( + id = id, + nodeName = serverId, + group = group, + event = extractEvent(), + codec = codec, + ) + + SagaState.COMMIT -> SagaCommitEvent( + id = id, + nodeName = serverId, + group = group, + event = extractEvent(), + codec = codec + ) + + SagaState.JOIN -> SagaJoinEvent( + id = id, + nodeName = serverId, + group = group, + event = extractEvent(), + codec = codec, + ) + + SagaState.ROLLBACK -> SagaRollbackEvent( + id = id, + nodeName = serverId, + group = group, + event = extractEvent(), + cause = cause + ?: throw NullPointerException("Null value on SagaRollbackEvent's cause field"), + codec = codec, + ) + } + } + + private fun extractEvent(): String? { + return when (event != null) { + true -> event + false -> null + } + } + + companion object { + internal fun of(sagaState: SagaState, sagaEvent: SagaEvent): Saga { + return Saga( + id = sagaEvent.id, + serverId = sagaEvent.nodeName, + group = sagaEvent.group, + state = sagaState, + cause = if (sagaState == SagaState.ROLLBACK) { + (sagaEvent as SagaRollbackEvent).cause + } else { + null + }, + event = sagaEvent.event + ) + } + } +} diff --git a/src/main/kotlin/org/rooftop/netx/engine/deadletter/AbstractDeadLetterManager.kt b/src/main/kotlin/org/rooftop/netx/engine/deadletter/AbstractDeadLetterManager.kt new file mode 100644 index 0000000..4c427cc --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/engine/deadletter/AbstractDeadLetterManager.kt @@ -0,0 +1,35 @@ +package org.rooftop.netx.engine.deadletter + +import org.rooftop.netx.api.DeadLetterRelay +import org.rooftop.netx.api.SagaEvent +import org.rooftop.netx.engine.AbstractSagaDispatcher +import org.rooftop.netx.engine.logging.error +import org.rooftop.netx.spi.DeadLetterListener +import org.rooftop.netx.spi.DeadLetterRegistry +import reactor.core.publisher.Mono + +internal abstract class AbstractDeadLetterManager : DeadLetterRelay, DeadLetterRegistry { + + private val deadLetterListeners: MutableList = mutableListOf() + + internal lateinit var dispatcher: AbstractSagaDispatcher + + override fun addListener(deadLetterListener: DeadLetterListener) { + deadLetterListeners.add(deadLetterListener) + } + + fun addDeadLetter(sagaEvent: SagaEvent): Mono { + return add(sagaEvent) + .doOnNext { + deadLetterListeners.forEach { deadLetterListener -> + runCatching { + deadLetterListener.listen(it, sagaEvent) + }.onFailure { + error("Fail to call deadLetterListener.listen", it) + } + } + } + } + + abstract fun add(sagaEvent: SagaEvent): Mono +} diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt index 7bbe2e4..860dd37 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt @@ -153,7 +153,7 @@ internal abstract class AbstractOrchestrateListener internal c OrchestrateEvent( orchestrateEvent.orchestratorId, beforeRollbackOrchestrateSequence, - "", + orchestrateEvent.clientEvent, orchestrateEvent.context, ) holdFailResult(id, throwable) diff --git a/src/main/kotlin/org/rooftop/netx/engine/logging/LoggingSupports.kt b/src/main/kotlin/org/rooftop/netx/engine/logging/LoggingSupports.kt index 6ef9550..4c202af 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/logging/LoggingSupports.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/logging/LoggingSupports.kt @@ -14,12 +14,16 @@ internal fun infoOnError(message: String, throwable: Throwable) = logger.info(me internal fun warningOnError(message: String) = logger.warn(message) -internal fun error(message: String) = logger.error(message) +internal fun error(message: String, throwable: Throwable? = null) = logger.error(message, throwable) internal fun Mono.info(message: String): Mono = this.doOnNext { logger.info(message) } +internal fun Mono.info(message: (T) -> String): Mono = this.doOnNext { + logger.info(message.invoke(it)) +} + internal fun Mono.infoOnError(message: String): Mono = this.doOnError { logger.info(message, it) } diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisDeadLetterManager.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisDeadLetterManager.kt new file mode 100644 index 0000000..2804028 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/redis/RedisDeadLetterManager.kt @@ -0,0 +1,99 @@ +package org.rooftop.netx.redis + +import org.rooftop.netx.api.DeadLetterException +import org.rooftop.netx.api.DeadLetterTimeoutException +import org.rooftop.netx.api.SagaEvent +import org.rooftop.netx.core.Codec +import org.rooftop.netx.engine.AbstractSagaDispatcher.Companion.DEAD_LETTER +import org.rooftop.netx.engine.core.Saga +import org.rooftop.netx.engine.core.SagaState +import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager +import org.rooftop.netx.engine.logging.error +import org.rooftop.netx.engine.logging.info +import org.springframework.data.domain.Range +import org.springframework.data.redis.connection.Limit +import org.springframework.data.redis.connection.stream.MapRecord +import org.springframework.data.redis.connection.stream.Record +import org.springframework.data.redis.core.ReactiveRedisTemplate +import reactor.core.publisher.Mono + +internal class RedisDeadLetterManager( + private val codec: Codec, + private val reactiveRedisTemplate: ReactiveRedisTemplate, +) : AbstractDeadLetterManager() { + + override fun relaySync(): SagaEvent { + return relay().block() ?: throw DeadLetterTimeoutException("Cannot get dead letter") + } + + override fun relay(): Mono { + return reactiveRedisTemplate.opsForStream() + .reverseRange(DEAD_LETTER_KEY, Range.unbounded(), Limit.limit().count(1)) + .single() + .doOnNext { + info("Success to read dead letter $it") + } + .dispatch() + } + + + override fun relay(deadLetterId: String): Mono { + return reactiveRedisTemplate.opsForStream() + .range(DEAD_LETTER_KEY, Range.just(deadLetterId)) + .single() + .doOnNext { + info("Success to read dead letter $it") + } + .dispatch() + } + + override fun relaySync(deadLetterId: String): SagaEvent { + return relay(deadLetterId).block() + ?: throw DeadLetterTimeoutException("Cannot get dead letter by deadLetterId: \"$deadLetterId\"") + } + + private fun Mono>.dispatch(): Mono { + return this.map { + it to codec.decode( + it.value[DATA] + ?: throw DeadLetterException("Cannot find any data from record $it"), + Saga::class, + ) + }.flatMap { + val record = it.first + val saga = it.second + dispatcher.dispatch(saga, DEAD_LETTER) + .map { record to saga.toEvent(codec) } + .info { "Success to dispatch dead letter. saga: \"$saga\"" } + }.flatMap { + val record = it.first + val saga = it.second + reactiveRedisTemplate.opsForStream() + .delete(DEAD_LETTER_KEY, record.id) + .map { saga } + .info { + "Success to delete dead letter. record: \"$record\"" + } + }.doOnError { + error("Fail to relay dead letter", it) + } + } + + override fun add(sagaEvent: SagaEvent): Mono { + return reactiveRedisTemplate.opsForStream() + .add( + Record.of( + mapOf(DATA to codec.encode(Saga.of(SagaState.ROLLBACK, sagaEvent))) + ).withStreamKey(DEAD_LETTER_KEY) + ) + .info("Success to add dead letter to \"$DEAD_LETTER_KEY\". event: \"$sagaEvent\"") + .doOnError { + error("Fail to add dead letter to \"$DEAD_LETTER_KEY\". event: \"$sagaEvent\"") + }.map { it.value } + } + + companion object { + private const val DATA = "data" + private const val DEAD_LETTER_KEY = "NETX_DEAD_LETTER" + } +} diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt index 02c4544..3072990 100644 --- a/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt +++ b/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt @@ -169,6 +169,7 @@ class RedisSagaConfigurer( nodeGroup = nodeGroup, codec = jsonCodec(), sagaManager = redisStreamSagaManager(), + abstractDeadLetterManager = redisDeadLetterManager(), ).also { info("RedisStreamSagaDispatcher connect to host : \"$host\" port : \"$port\" nodeName : \"$nodeName\" nodeGroup : \"$nodeGroup\"") } @@ -217,4 +218,14 @@ class RedisSagaConfigurer( return LettuceConnectionFactory(redisStandaloneConfiguration) } + + @Bean + @Primary + @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") + internal fun redisDeadLetterManager(): RedisDeadLetterManager { + return RedisDeadLetterManager( + codec = jsonCodec(), + reactiveRedisTemplate = sagaReactiveRedisTemplate(), + ) + } } diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamSagaDispatcher.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamSagaDispatcher.kt index 3799008..3880e37 100644 --- a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamSagaDispatcher.kt +++ b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamSagaDispatcher.kt @@ -5,6 +5,7 @@ import org.rooftop.netx.api.FailedAckSagaException import org.rooftop.netx.api.SagaManager import org.rooftop.netx.engine.AbstractSagaDispatcher import org.rooftop.netx.engine.core.Saga +import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager import org.rooftop.netx.meta.SagaHandler import org.springframework.context.ApplicationContext import org.springframework.data.redis.core.ReactiveRedisTemplate @@ -16,7 +17,8 @@ internal class RedisStreamSagaDispatcher( private val applicationContext: ApplicationContext, private val reactiveRedisTemplate: ReactiveRedisTemplate, private val nodeGroup: String, -) : AbstractSagaDispatcher(codec, sagaManager) { + abstractDeadLetterManager: AbstractDeadLetterManager, +) : AbstractSagaDispatcher(codec, sagaManager, abstractDeadLetterManager) { override fun findHandlers(): List { return applicationContext.getBeansWithAnnotation(SagaHandler::class.java) diff --git a/src/main/kotlin/org/rooftop/netx/spi/DeadLetterListener.kt b/src/main/kotlin/org/rooftop/netx/spi/DeadLetterListener.kt new file mode 100644 index 0000000..43b6ce0 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/spi/DeadLetterListener.kt @@ -0,0 +1,19 @@ +package org.rooftop.netx.spi + +import org.rooftop.netx.api.SagaEvent + +/** + * By implementing this class, you can listen for dead letter occurrence events. + * + * @see DeadLetterRegistry.addListener + */ +fun interface DeadLetterListener { + + /** + * handle dead letter add event + * + * @param deadLetterId | DeadLetter Id + * @param sagaEvent | DeadLetter + */ + fun listen(deadLetterId: String, sagaEvent: SagaEvent) +} diff --git a/src/main/kotlin/org/rooftop/netx/spi/DeadLetterRegistry.kt b/src/main/kotlin/org/rooftop/netx/spi/DeadLetterRegistry.kt new file mode 100644 index 0000000..8e2a256 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/spi/DeadLetterRegistry.kt @@ -0,0 +1,16 @@ +package org.rooftop.netx.spi + +/** + * An interface for registry dead letter listener. + * + * @see DeadLetterListener + */ +fun interface DeadLetterRegistry { + + /** + * Adding a hook to be executed when a dead letter occurs. + * + * @see DeadLetterListener + */ + fun addListener(deadLetterListener: DeadLetterListener) +} diff --git a/src/test/kotlin/org/rooftop/netx/engine/DeadLetterAnnotationClass.kt b/src/test/kotlin/org/rooftop/netx/engine/DeadLetterAnnotationClass.kt new file mode 100644 index 0000000..73db2c5 --- /dev/null +++ b/src/test/kotlin/org/rooftop/netx/engine/DeadLetterAnnotationClass.kt @@ -0,0 +1,33 @@ +package org.rooftop.netx.engine + +import org.rooftop.netx.api.SagaRollbackEvent +import org.rooftop.netx.api.SagaRollbackListener +import org.rooftop.netx.api.SagaStartEvent +import org.rooftop.netx.api.SagaStartListener +import org.rooftop.netx.meta.SagaHandler + +@SagaHandler +class DeadLetterAnnotationClass { + + lateinit var errorInjector: ErrorInjector + lateinit var relayResultHolder: RelayResultHolder + + + @SagaStartListener(event = RelayEvent::class) + fun sagaStartListener(sagaStartEvent: SagaStartEvent) { + val relayEvent = sagaStartEvent.decodeEvent(RelayEvent::class) + sagaStartEvent.setNextEvent(relayEvent) + + throw IllegalArgumentException("Rollback on start") + } + + @SagaRollbackListener(event = RelayEvent::class) + fun sagaRollbackListener(sagaRollbackEvent: SagaRollbackEvent) { + val relayEvent = sagaRollbackEvent.decodeEvent(RelayEvent::class) + if (errorInjector.doError) { + throw IllegalStateException("Rollback on start") + } + + relayResultHolder.hold("DeadLetterAnnotationClass", relayEvent) + } +} diff --git a/src/test/kotlin/org/rooftop/netx/engine/DeadLetterConfigurer.kt b/src/test/kotlin/org/rooftop/netx/engine/DeadLetterConfigurer.kt new file mode 100644 index 0000000..ef9a4d6 --- /dev/null +++ b/src/test/kotlin/org/rooftop/netx/engine/DeadLetterConfigurer.kt @@ -0,0 +1,109 @@ +package org.rooftop.netx.engine + +import org.rooftop.netx.api.Orchestrator +import org.springframework.boot.test.context.TestConfiguration +import org.springframework.context.annotation.Bean + +class ErrorInjector { + var doError: Boolean = false +} + +class RelayResultHolder { + + private val holder: MutableMap> = mutableMapOf() + + operator fun get(key: String) = holder[key] + + fun hold(key: String, value: RelayEvent) { + holder.putIfAbsent(key, mutableListOf()) + holder[key]?.add(value) + } + + fun clear() { + holder.clear() + } +} + +data class RelayEvent( + val id: Long, + val name: String, + val list: List, + val depth: Depth, +) { + + data class Depth( + val success: Boolean, + ) +} + +@TestConfiguration +internal class DeadLetterConfigurer( + private val orchestratorFactory: OrchestratorFactory, + private val deadLetterAnnotationClass: DeadLetterAnnotationClass, +) { + + @Bean + fun errorInjector() = ErrorInjector().apply { + deadLetterAnnotationClass.errorInjector = this + } + + @Bean + fun relayResultHolder() = RelayResultHolder().apply { + deadLetterAnnotationClass.relayResultHolder = this + } + + @Bean(name = ["relay1Depths"]) + fun relay1Depths( + relayResultHolder: RelayResultHolder, + errorInjector: ErrorInjector, + ): Orchestrator { + return orchestratorFactory.create("relay1Depths") + .start( + orchestrate = { + it + }, + rollback = { + if (errorInjector.doError) { + throw IllegalStateException("Error on rollback") + } + relayResultHolder.hold("relay1Depths", it) + } + ) + .commit { + throw IllegalStateException("") + } + } + + @Bean(name = ["relay2Depths"]) + fun relay2Depths( + relayResultHolder: RelayResultHolder, + errorInjector: ErrorInjector, + ): Orchestrator { + return orchestratorFactory.create("relay2Depths") + .start( + orchestrate = { + it + }, + rollback = { + if (errorInjector.doError) { + throw IllegalStateException("Error on rollback") + } + relayResultHolder.hold("relay2Depths", it) + } + ) + .joinWithContext( + contextOrchestrate = { _, request -> + request + }, + contextRollback = { _, request -> + if (errorInjector.doError) { + throw IllegalStateException("Error on rollback") + } + relayResultHolder.hold("relay2Depths", request) + } + ) + .commit { + throw IllegalStateException("") + } + } +} diff --git a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt index 2d8ed38..f4d6ca2 100644 --- a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt +++ b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt @@ -329,6 +329,65 @@ internal class OrchestratorConfigurer( .commitWithContext({ _, _ -> "" }) } + @Bean(name = ["whenErrorOccurredRollbackThenAddDeadLetterOrchestrator"]) + fun whenErrorOccurredRollbackThenAddDeadLetterOrchestrator(): Orchestrator { + return OrchestratorFactory.instance() + .create("whenErrorOccurredRollbackThenAddDeadLetterOrchestrator") + .start( + orchestrate = { "" }, + rollback = { + throw IllegalStateException("Add dead letter") + } + ) + .commit( + orchestrate = { + throw IllegalStateException("Throw error") + } + ) + } + + @Bean(name = ["whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator"]) + fun whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator(): Orchestrator { + return OrchestratorFactory.instance() + .create("whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator") + .startWithContext( + contextOrchestrate = { _, _ -> "" }, + contextRollback = { _, _ -> + throw IllegalStateException("Add dead letter") + } + ) + .joinWithContext( + contextOrchestrate = { _, _ -> "" }, + contextRollback = { _, _ -> + throw IllegalStateException("Add dead letter") + } + ) + .commitWithContext { _, _ -> + throw IllegalStateException("Throw error") + } + } + + @Bean(name = ["whenErrorOccurredRollbackThenAddDeadLetterReactiveOrchestrator"]) + fun whenErrorOccurredRollbackThenAddDeadLetterReactiveOrchestrator(): Orchestrator { + return OrchestratorFactory.instance() + .create("whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator") + .startReactive( + orchestrate = { Mono.just("") }, + rollback = { + throw IllegalStateException("Add dead letter") + } + ) + .joinReactive( + orchestrate = { Mono.just("") }, + rollback = { + throw IllegalStateException("Add dead letter") + } + ) + .commitReactive { + throw IllegalStateException("Throw error") + } + } + fun interface ListOrchestrate : ContextOrchestrate, List> { diff --git a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt index a0faa90..b9ae596 100644 --- a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt +++ b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt @@ -8,15 +8,14 @@ import io.kotest.core.annotation.DisplayName import io.kotest.core.spec.style.DescribeSpec import io.kotest.matchers.equality.shouldBeEqualToComparingFields import io.kotest.matchers.equals.shouldBeEqual -import org.rooftop.netx.api.Orchestrator -import org.rooftop.netx.api.ResultException -import org.rooftop.netx.api.TypeReference +import io.kotest.matchers.shouldNotBe +import org.rooftop.netx.api.* import org.rooftop.netx.meta.EnableSaga import org.rooftop.netx.redis.RedisContainer +import org.rooftop.netx.spi.DeadLetterRegistry import org.springframework.beans.factory.annotation.Qualifier import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource -import org.springframework.web.client.HttpClientErrorException import java.time.Instant import kotlin.time.Duration.Companion.seconds @@ -34,6 +33,7 @@ internal class OrchestratorTest( private val homeOrchestrator: Orchestrator, private val instantOrchestrator: Orchestrator, private val manyTypeOrchestrator: Orchestrator, + private val deadLetterRegistry: DeadLetterRegistry, @Qualifier("rollbackOrchestrator") private val rollbackOrchestrator: Orchestrator, @Qualifier("upChainRollbackOrchestrator") private val upChainRollbackOrchestrator: Orchestrator, @Qualifier("monoRollbackOrchestrator") private val monoRollbackOrchestrator: Orchestrator, @@ -49,6 +49,9 @@ internal class OrchestratorTest( @Qualifier("throwOnCommitWithContextOrchestrator") private val throwOnCommitWithContextOrchestrator: Orchestrator, List>, @Qualifier("throwJwtExceptionOnStartOrchestrator") private val throwJwtExceptionOnStartOrchestrator: Orchestrator, @Qualifier("throwHttpClientErrorExceptionOnStartOrchestrator") private val throwHttpClientErrorExceptionOnStartOrchestrator: Orchestrator, + @Qualifier("whenErrorOccurredRollbackThenAddDeadLetterOrchestrator") private val whenErrorOccurredRollbackThenAddDeadLetterOrchestrator: Orchestrator, + @Qualifier("whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator") private val whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator: Orchestrator, + @Qualifier("whenErrorOccurredRollbackThenAddDeadLetterReactiveOrchestrator") private val whenErrorOccurredRollbackThenAddDeadLetterReactiveOrchestrator: Orchestrator, ) : DescribeSpec({ describe("numberOrchestrator 구현채는") { @@ -312,6 +315,57 @@ internal class OrchestratorTest( } } } + + describe("whenErrorOccurredRollbackThenAddDeadLetterOrchestrator 구현채는") { + context("롤백 파이프라인에서 에러가 던져지면,") { + var result: SagaEvent? = null + deadLetterRegistry.addListener { _, event -> + result = event + } + + it("DeadLetterQueue 에 메시지를 저장한다") { + whenErrorOccurredRollbackThenAddDeadLetterOrchestrator.sagaSync("") + + eventually(5.seconds) { + result shouldNotBe null + } + } + } + } + + describe("whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator 구현채는") { + context("롤백 파이프라인에서 에러가 던져지면,") { + var result: SagaEvent? = null + deadLetterRegistry.addListener { _, event -> + result = event + } + + it("DeadLetterQueue 에 메시지를 저장한다") { + whenErrorOccurredRollbackThenAddDeadLetterContextOrchestrator.sagaSync("") + + eventually(5.seconds) { + result shouldNotBe null + } + } + } + } + + describe("whenErrorOccurredRollbackThenAddDeadLetterReactiveOrchestrator 구현채는") { + context("롤백 파이프라인에서 에러가 던져지면,") { + var result: SagaEvent? = null + deadLetterRegistry.addListener { _, event -> + result = event + } + + it("DeadLetterQueue 에 메시지를 저장한다") { + whenErrorOccurredRollbackThenAddDeadLetterReactiveOrchestrator.sagaSync("") + + eventually(5.seconds) { + result shouldNotBe null + } + } + } + } }) { data class Home( val address: String, diff --git a/src/test/kotlin/org/rooftop/netx/redis/RedisDeadLetterManagerTest.kt b/src/test/kotlin/org/rooftop/netx/redis/RedisDeadLetterManagerTest.kt new file mode 100644 index 0000000..b56fc3e --- /dev/null +++ b/src/test/kotlin/org/rooftop/netx/redis/RedisDeadLetterManagerTest.kt @@ -0,0 +1,153 @@ +package org.rooftop.netx.redis + +import io.kotest.assertions.nondeterministic.eventually +import io.kotest.core.annotation.DisplayName +import io.kotest.core.spec.style.DescribeSpec +import io.kotest.matchers.booleans.shouldBeTrue +import io.kotest.matchers.shouldBe +import org.rooftop.netx.api.DeadLetterRelay +import org.rooftop.netx.api.Orchestrator +import org.rooftop.netx.api.SagaManager +import org.rooftop.netx.engine.* +import org.rooftop.netx.meta.EnableSaga +import org.rooftop.netx.spi.DeadLetterRegistry +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import kotlin.time.Duration.Companion.seconds + + +@EnableSaga +@ContextConfiguration( + classes = [ + RedisContainer::class, + DeadLetterConfigurer::class, + DeadLetterAnnotationClass::class, + ] +) +@DisplayName("DeadLetterRelay 클래스의") +@TestPropertySource("classpath:application.properties") +internal class RedisDeadLetterManagerTest( + private val sagaManager: SagaManager, + private val errorInjector: ErrorInjector, + private val relayResultHolder: RelayResultHolder, + private val deadLetterRelay: DeadLetterRelay, + private val deadLetterRegistry: DeadLetterRegistry, + @Qualifier("relay1Depths") private val relay1Depths: Orchestrator, + @Qualifier("relay2Depths") private val relay2Depths: Orchestrator, +) : DescribeSpec({ + + beforeEach { + errorInjector.doError = true + relayResultHolder.clear() + } + + describe("relay1Depths 구현채는") { + context("rollback중 예외가 발생시, relay를 호출하면") { + val relayEvent = RelayEvent(1, "1", listOf("1", "1"), RelayEvent.Depth(true)) + + var isSuccessToAddDeadLetter = false + deadLetterRegistry.addListener { _, _ -> + isSuccessToAddDeadLetter = true + } + + it("실패한 곳 부터 재시도 한다") { + relay1Depths.sagaSync(relayEvent) + + eventually(5.seconds) { + isSuccessToAddDeadLetter.shouldBeTrue() + } + + errorInjector.doError = false + + deadLetterRelay.relaySync() + + eventually(5.seconds) { + relayResultHolder["relay1Depths"] shouldBe listOf(relayEvent) + } + } + } + } + + describe("relay2Depths 구현채는") { + context("rollback중 예외가 발생시, relay를 호출하면") { + val relayEvent = RelayEvent(2, "2", listOf("2", "2", "2"), RelayEvent.Depth(true)) + + var isSuccessToAddDeadLetter = false + deadLetterRegistry.addListener { _, _ -> + isSuccessToAddDeadLetter = true + } + + it("실패한 곳 부터 재시도 한다") { + relay2Depths.sagaSync(relayEvent) + + eventually(5.seconds) { + isSuccessToAddDeadLetter.shouldBeTrue() + } + + errorInjector.doError = false + deadLetterRelay.relaySync() + + eventually(5.seconds) { + relayResultHolder["relay2Depths"] shouldBe listOf(relayEvent, relayEvent) + } + } + } + } + + describe("DeadLetterAnnotationClass 구현채는") { + context("SagaRollbackListener에서 에러가 발생시 relay를 호출하면") { + val relayEvent = RelayEvent(3, "3", listOf("3", "3"), RelayEvent.Depth(true)) + + var isSuccessToAddDeadLetter = false + deadLetterRegistry.addListener { _, _ -> + isSuccessToAddDeadLetter = true + } + + it("실패한 RollbackListener를 재시도한다") { + sagaManager.startSync(relayEvent) + + eventually(5.seconds) { + isSuccessToAddDeadLetter.shouldBeTrue() + } + + errorInjector.doError = false + deadLetterRelay.relaySync() + + eventually(5.seconds) { + relayResultHolder["DeadLetterAnnotationClass"] shouldBe listOf(relayEvent) + } + } + } + } + + describe("relay(deadLetterId: String) 메소드는") { + context("특정 deadLetterId를 입력받으면,") { + val relay1Event = RelayEvent(1, "1", listOf("1"), RelayEvent.Depth(false)) + val relay2Event = RelayEvent(2, "2", listOf("2"), RelayEvent.Depth(false)) + + val deadLetterIds: MutableMap = mutableMapOf() + deadLetterRegistry.addListener { deadLetterId, sagaEvent -> + val relayEvent = sagaEvent.decodeOrchestrateEvent(RelayEvent::class) + deadLetterIds[relayEvent.id] = deadLetterId + } + + it("해당하는 deadLetter 를 retry한다.") { + relay1Depths.sagaSync(relay1Event) + relay2Depths.sagaSync(relay2Event) + + eventually(5.seconds) { + deadLetterIds.size shouldBe 2 + } + + errorInjector.doError = false + deadLetterRelay.relaySync(deadLetterIds[1]!!) + + eventually(5.seconds) { + relayResultHolder["relay1Depths"] shouldBe listOf(relay1Event) + relayResultHolder["relay2Depths"] shouldBe null + } + } + } + } +})