diff --git a/README.md b/README.md index 8809c11..e742098 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@
-![version 0.4.9](https://img.shields.io/badge/version-0.4.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square) +![version 0.5.0](https://img.shields.io/badge/version-0.4.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square) ![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white) **TPS(6,000)** on my Macbook air m2(default options). _[link](#Test1-TPS)_ diff --git a/gradle.properties b/gradle.properties index be4c05b..c161c34 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,7 +2,7 @@ kotlin.code.style=official ### Project ### group=org.rooftopmsa -version=0.4.9 +version=0.5.0 compatibility=17 ### Sonarcloud ### diff --git a/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt b/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt index 0ff33a0..b38b28e 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt @@ -6,6 +6,7 @@ import org.rooftop.netx.engine.listen.* import reactor.core.publisher.Mono internal class DefaultOrchestrateChain private constructor( + private val group: String, private val orchestratorId: String, private val orchestrateSequence: Int, private val chainContainer: ChainContainer, @@ -56,6 +57,7 @@ internal class DefaultOrchestrateChain privat requestHolder = chainContainer.requestHolder, resultHolder = chainContainer.resultHolder, typeReference = function.reified(), + group = group, ) private fun nextOrchestrateChain( @@ -63,12 +65,13 @@ internal class DefaultOrchestrateChain privat nextRollbackOrchestrateListener: RollbackOrchestrateListener? ): OrchestrateChain { val nextDefaultOrchestrateChain = DefaultOrchestrateChain( - orchestratorId, - orchestrateSequence + 1, - chainContainer, - nextJoinOrchestrateListener, - nextRollbackOrchestrateListener, - this, + group = group, + orchestratorId = orchestratorId, + orchestrateSequence = orchestrateSequence + 1, + chainContainer = chainContainer, + orchestrateListener = nextJoinOrchestrateListener, + rollbackOrchestrateListener = nextRollbackOrchestrateListener, + beforeDefaultOrchestrateChain = this, ) this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain @@ -97,12 +100,13 @@ internal class DefaultOrchestrateChain privat getMonoRollbackOrchestrateListener(CommandType.CONTEXT, contextRollback) val nextDefaultOrchestrateChain = DefaultOrchestrateChain( - orchestratorId, - orchestrateSequence + 1, - chainContainer, - nextJoinOrchestrateListener, - nextRollbackOrchestrateListener, - this, + group = group, + orchestratorId = orchestratorId, + orchestrateSequence = orchestrateSequence + 1, + chainContainer = chainContainer, + orchestrateListener = nextJoinOrchestrateListener, + rollbackOrchestrateListener = nextRollbackOrchestrateListener, + beforeDefaultOrchestrateChain = this, ) this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain @@ -124,7 +128,8 @@ internal class DefaultOrchestrateChain privat ), requestHolder = chainContainer.requestHolder, resultHolder = chainContainer.resultHolder, - function.reified(), + typeReference = function.reified(), + group = group, ) private fun nextOrchestrateChain( @@ -132,12 +137,13 @@ internal class DefaultOrchestrateChain privat nextRollbackOrchestrateListener: MonoRollbackOrchestrateListener? ): OrchestrateChain { val nextDefaultOrchestrateChain = DefaultOrchestrateChain( - orchestratorId, - orchestrateSequence + 1, - chainContainer, - nextJoinOrchestrateListener, - nextRollbackOrchestrateListener, - this, + group = group, + orchestratorId = orchestratorId, + orchestrateSequence = orchestrateSequence + 1, + chainContainer = chainContainer, + orchestrateListener = nextJoinOrchestrateListener, + rollbackOrchestrateListener = nextRollbackOrchestrateListener, + beforeDefaultOrchestrateChain = this, ) this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain @@ -173,7 +179,8 @@ internal class DefaultOrchestrateChain privat orchestrateCommand = OrchestrateCommand(commandType, chainContainer.codec, function), resultHolder = chainContainer.resultHolder, requestHolder = chainContainer.requestHolder, - function.reified(), + typeReference = function.reified(), + group = group, ) private fun getRollbackOrchestrateListener( @@ -189,6 +196,7 @@ internal class DefaultOrchestrateChain privat requestHolder = chainContainer.requestHolder, resultHolder = chainContainer.resultHolder, typeReference = it.reified(), + group = group, ) } @@ -197,12 +205,13 @@ internal class DefaultOrchestrateChain privat ): Orchestrator { return chainContainer.orchestratorCache.cache(orchestratorId) { val nextDefaultOrchestrateChain = DefaultOrchestrateChain( - orchestratorId, - orchestrateSequence + 1, - chainContainer, - nextCommitOrchestrateListener, - null, - this, + orchestratorId = orchestratorId, + orchestrateSequence = orchestrateSequence + 1, + chainContainer = chainContainer, + orchestrateListener = nextCommitOrchestrateListener, + rollbackOrchestrateListener = null, + beforeDefaultOrchestrateChain = this, + group = group, ) this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain val firstOrchestrators = nextDefaultOrchestrateChain.initOrchestrateListeners() @@ -241,12 +250,13 @@ internal class DefaultOrchestrateChain privat ): Orchestrator { return chainContainer.orchestratorCache.cache(orchestratorId) { val nextDefaultOrchestrateChain = DefaultOrchestrateChain( - orchestratorId, - orchestrateSequence + 1, - chainContainer, - nextJoinOrchestrateListener, - null, - this, + group = group, + orchestratorId = orchestratorId, + orchestrateSequence = orchestrateSequence + 1, + chainContainer = chainContainer, + orchestrateListener = nextJoinOrchestrateListener, + rollbackOrchestrateListener = null, + beforeDefaultOrchestrateChain = this, ) this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain @@ -385,6 +395,7 @@ internal class DefaultOrchestrateChain privat resultHolder = chainContainer.resultHolder, requestHolder = chainContainer.requestHolder, typeReference = function.reified(), + group = group, ) private fun getMonoRollbackOrchestrateListener( @@ -404,10 +415,12 @@ internal class DefaultOrchestrateChain privat requestHolder = chainContainer.requestHolder, resultHolder = chainContainer.resultHolder, typeReference = it.reified(), + group = group, ) } internal class Pre internal constructor( + private val group: String, private val orchestratorId: String, private val sagaManager: SagaManager, private val sagaDispatcher: AbstractSagaDispatcher, @@ -427,6 +440,7 @@ internal class DefaultOrchestrateChain privat getRollbackOrchestrateListener(CommandType.DEFAULT, rollback) return DefaultOrchestrateChain( + group = group, orchestratorId = orchestratorId, orchestrateSequence = 0, chainContainer = getStreamContainer(), @@ -445,6 +459,7 @@ internal class DefaultOrchestrateChain privat getRollbackOrchestrateListener(CommandType.CONTEXT, contextRollback) return DefaultOrchestrateChain( + group = group, orchestratorId = orchestratorId, orchestrateSequence = 0, chainContainer = getStreamContainer(), @@ -469,6 +484,7 @@ internal class DefaultOrchestrateChain privat requestHolder = requestHolder, resultHolder = resultHolder, typeReference = function.reified(), + group = group, ) private fun getRollbackOrchestrateListener( @@ -487,7 +503,8 @@ internal class DefaultOrchestrateChain privat ), requestHolder = requestHolder, resultHolder = resultHolder, - typeReference = it.reified() + typeReference = it.reified(), + group = group, ) } @@ -501,6 +518,7 @@ internal class DefaultOrchestrateChain privat getMonoRollbackOrchestrateListener(CommandType.DEFAULT, rollback) return DefaultOrchestrateChain( + group = group, orchestratorId = orchestratorId, orchestrateSequence = 0, chainContainer = getStreamContainer(), @@ -519,6 +537,7 @@ internal class DefaultOrchestrateChain privat getMonoRollbackOrchestrateListener(CommandType.CONTEXT, contextRollback) return DefaultOrchestrateChain( + group = group, orchestratorId = orchestratorId, orchestrateSequence = 0, chainContainer = getStreamContainer(), @@ -543,6 +562,7 @@ internal class DefaultOrchestrateChain privat requestHolder = requestHolder, resultHolder = resultHolder, typeReference = function.reified(), + group = group, ) private fun getMonoRollbackOrchestrateListener( @@ -562,6 +582,7 @@ internal class DefaultOrchestrateChain privat requestHolder = requestHolder, resultHolder = resultHolder, typeReference = it.reified(), + group = group, ) } diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt index c3d8bdc..7e0d51e 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt @@ -5,6 +5,7 @@ import org.rooftop.netx.api.OrchestratorFactory import org.rooftop.netx.core.Codec internal class OrchestratorFactory internal constructor( + private val group: String, private val sagaManager: SagaManager, private val sagaDispatcher: AbstractSagaDispatcher, private val codec: Codec, @@ -26,6 +27,7 @@ internal class OrchestratorFactory internal constructor( resultHolder = resultHolder, requestHolder = requestHolder, orchestratorCache = orchestratorCache, + group = group, ) } } diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt index 860dd37..5b4777e 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt @@ -20,6 +20,7 @@ internal abstract class AbstractOrchestrateListener internal c private val requestHolder: RequestHolder, private val resultHolder: ResultHolder, private val typeReference: TypeReference?, + private val group: String, ) { var isFirst: Boolean = true @@ -58,7 +59,7 @@ internal abstract class AbstractOrchestrateListener internal c protected fun orchestrate(sagaEvent: SagaEvent): Mono { return sagaEvent.startWithOrchestrateEvent() .filter { - it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId + it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId && sagaEvent.group == group } .mapReifiedRequest() .flatMap { (request, event) -> diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt index f0e2a7e..70a380e 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt @@ -16,6 +16,7 @@ internal class CommitOrchestrateListener internal constructor( private val resultHolder: ResultHolder, requestHolder: RequestHolder, typeReference: TypeReference?, + private val group: String, ) : AbstractOrchestrateListener( orchestratorId, orchestrateSequence, @@ -24,12 +25,13 @@ internal class CommitOrchestrateListener internal constructor( requestHolder, resultHolder, typeReference, + group ) { @SagaCommitListener(OrchestrateEvent::class) fun listenCommitOrchestrateEvent(sagaCommitEvent: SagaCommitEvent): Mono { return sagaCommitEvent.startWithOrchestrateEvent() - .filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId } + .filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId && sagaCommitEvent.group == this.group } .mapReifiedRequest() .flatMap { (request, event) -> holdRequestIfRollbackable(request, sagaCommitEvent.id) diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt index 1f37c61..47bde6d 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt @@ -16,6 +16,7 @@ internal class JoinOrchestrateListener( private val requestHolder: RequestHolder, private val resultHolder: ResultHolder, private val typeReference: TypeReference?, + private val group: String, ) : AbstractOrchestrateListener( orchestratorId, orchestrateSequence, @@ -24,6 +25,7 @@ internal class JoinOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { override fun withAnnotated(): AbstractOrchestrateListener { @@ -43,6 +45,7 @@ internal class JoinOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { @SagaJoinListener( event = OrchestrateEvent::class, @@ -67,6 +70,7 @@ internal class JoinOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { @SagaJoinListener( event = OrchestrateEvent::class, diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt index 664d930..60c15f5 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt @@ -16,6 +16,7 @@ internal class MonoCommitOrchestrateListener internal construc requestHolder: RequestHolder, private val resultHolder: ResultHolder, typeReference: TypeReference?, + private val group: String, ) : AbstractOrchestrateListener( orchestratorId, orchestrateSequence, @@ -24,11 +25,12 @@ internal class MonoCommitOrchestrateListener internal construc requestHolder, resultHolder, typeReference, + group, ) { @SagaCommitListener(OrchestrateEvent::class) fun listenCommitOrchestrateEvent(sagaCommitEvent: SagaCommitEvent): Mono { return sagaCommitEvent.startWithOrchestrateEvent() - .filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId } + .filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId && sagaCommitEvent.group == this.group} .mapReifiedRequest() .flatMap { (request, event) -> holdRequestIfRollbackable(request, sagaCommitEvent.id) diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt index 8a7c5df..dbc09d7 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt @@ -16,6 +16,7 @@ internal class MonoJoinOrchestrateListener( private val requestHolder: RequestHolder, private val resultHolder: ResultHolder, private val typeReference: TypeReference?, + private val group: String, ) : AbstractOrchestrateListener( orchestratorId, orchestrateSequence, @@ -24,6 +25,7 @@ internal class MonoJoinOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { override fun withAnnotated(): AbstractOrchestrateListener { @@ -43,6 +45,7 @@ internal class MonoJoinOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { @SagaJoinListener( event = OrchestrateEvent::class, @@ -67,6 +70,7 @@ internal class MonoJoinOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { @SagaJoinListener( event = OrchestrateEvent::class, diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt index 82d5f95..9d0eb97 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt @@ -16,6 +16,7 @@ internal class MonoRollbackOrchestrateListener( requestHolder: RequestHolder, resultHolder: ResultHolder, typeReference: TypeReference?, + private val group: String, ) : AbstractOrchestrateListener( orchestratorId, orchestrateSequence, @@ -24,12 +25,13 @@ internal class MonoRollbackOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { @SagaRollbackListener(OrchestrateEvent::class) fun listenRollbackOrchestrateEvent(sagaRollbackEvent: SagaRollbackEvent): Mono { return sagaRollbackEvent.startWithOrchestrateEvent() - .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence } + .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence && sagaRollbackEvent.group == this.group } .getHeldRequest(sagaRollbackEvent) .flatMap { (request, event) -> monoRollbackCommand.command(request, event.context) diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt index fbeb643..9892738 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt @@ -16,6 +16,7 @@ internal class MonoStartOrchestrateListener( private val requestHolder: RequestHolder, private val resultHolder: ResultHolder, private val typeReference: TypeReference?, + private val group: String, ) : AbstractOrchestrateListener( orchestratorId, orchestrateSequence, @@ -24,6 +25,7 @@ internal class MonoStartOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { override fun withAnnotated(): AbstractOrchestrateListener { @@ -43,6 +45,7 @@ internal class MonoStartOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { @SagaStartListener( event = OrchestrateEvent::class, @@ -67,6 +70,7 @@ internal class MonoStartOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { @SagaStartListener( event = OrchestrateEvent::class, diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt index bc1c41e..6e18941 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt @@ -16,6 +16,7 @@ internal class RollbackOrchestrateListener( requestHolder: RequestHolder, resultHolder: ResultHolder, typeReference: TypeReference?, + private val group: String, ) : AbstractOrchestrateListener( orchestratorId, orchestrateSequence, @@ -24,12 +25,13 @@ internal class RollbackOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { @SagaRollbackListener(OrchestrateEvent::class) fun listenRollbackOrchestrateEvent(sagaRollbackEvent: SagaRollbackEvent): Mono { return sagaRollbackEvent.startWithOrchestrateEvent() - .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence } + .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence && sagaRollbackEvent.group == group } .getHeldRequest(sagaRollbackEvent) .map { (request, event) -> rollbackCommand.command(request, event.context) diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt index 95b47f7..463bafd 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt @@ -16,6 +16,7 @@ internal class StartOrchestrateListener( private val requestHolder: RequestHolder, private val resultHolder: ResultHolder, private val typeReference: TypeReference?, + private val group: String, ) : AbstractOrchestrateListener( orchestratorId, orchestrateSequence, @@ -24,6 +25,7 @@ internal class StartOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { override fun withAnnotated(): AbstractOrchestrateListener { @@ -43,6 +45,7 @@ internal class StartOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { @SagaStartListener( event = OrchestrateEvent::class, @@ -67,6 +70,7 @@ internal class StartOrchestrateListener( requestHolder, resultHolder, typeReference, + group, ) { @SagaStartListener( event = OrchestrateEvent::class, diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt index 3072990..0f4422a 100644 --- a/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt +++ b/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt @@ -51,6 +51,7 @@ class RedisSagaConfigurer( @Bean @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") internal fun redisStreamOrchestratorFactory(): OrchestratorFactory = OrchestratorFactory( + group = nodeGroup, sagaManager = redisStreamSagaManager(), sagaDispatcher = redisStreamSagaDispatcher(), codec = jsonCodec(),