diff --git a/README.md b/README.md
index 8809c11..e742098 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@
-  
+  

**TPS(6,000)** on my Macbook air m2(default options). _[link](#Test1-TPS)_
diff --git a/gradle.properties b/gradle.properties
index be4c05b..c161c34 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -2,7 +2,7 @@ kotlin.code.style=official
### Project ###
group=org.rooftopmsa
-version=0.4.9
+version=0.5.0
compatibility=17
### Sonarcloud ###
diff --git a/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt b/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt
index 0ff33a0..b38b28e 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt
@@ -6,6 +6,7 @@ import org.rooftop.netx.engine.listen.*
import reactor.core.publisher.Mono
internal class DefaultOrchestrateChain private constructor(
+ private val group: String,
private val orchestratorId: String,
private val orchestrateSequence: Int,
private val chainContainer: ChainContainer,
@@ -56,6 +57,7 @@ internal class DefaultOrchestrateChain privat
requestHolder = chainContainer.requestHolder,
resultHolder = chainContainer.resultHolder,
typeReference = function.reified(),
+ group = group,
)
private fun nextOrchestrateChain(
@@ -63,12 +65,13 @@ internal class DefaultOrchestrateChain privat
nextRollbackOrchestrateListener: RollbackOrchestrateListener?
): OrchestrateChain {
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
- orchestratorId,
- orchestrateSequence + 1,
- chainContainer,
- nextJoinOrchestrateListener,
- nextRollbackOrchestrateListener,
- this,
+ group = group,
+ orchestratorId = orchestratorId,
+ orchestrateSequence = orchestrateSequence + 1,
+ chainContainer = chainContainer,
+ orchestrateListener = nextJoinOrchestrateListener,
+ rollbackOrchestrateListener = nextRollbackOrchestrateListener,
+ beforeDefaultOrchestrateChain = this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
@@ -97,12 +100,13 @@ internal class DefaultOrchestrateChain privat
getMonoRollbackOrchestrateListener(CommandType.CONTEXT, contextRollback)
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
- orchestratorId,
- orchestrateSequence + 1,
- chainContainer,
- nextJoinOrchestrateListener,
- nextRollbackOrchestrateListener,
- this,
+ group = group,
+ orchestratorId = orchestratorId,
+ orchestrateSequence = orchestrateSequence + 1,
+ chainContainer = chainContainer,
+ orchestrateListener = nextJoinOrchestrateListener,
+ rollbackOrchestrateListener = nextRollbackOrchestrateListener,
+ beforeDefaultOrchestrateChain = this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
@@ -124,7 +128,8 @@ internal class DefaultOrchestrateChain privat
),
requestHolder = chainContainer.requestHolder,
resultHolder = chainContainer.resultHolder,
- function.reified(),
+ typeReference = function.reified(),
+ group = group,
)
private fun nextOrchestrateChain(
@@ -132,12 +137,13 @@ internal class DefaultOrchestrateChain privat
nextRollbackOrchestrateListener: MonoRollbackOrchestrateListener?
): OrchestrateChain {
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
- orchestratorId,
- orchestrateSequence + 1,
- chainContainer,
- nextJoinOrchestrateListener,
- nextRollbackOrchestrateListener,
- this,
+ group = group,
+ orchestratorId = orchestratorId,
+ orchestrateSequence = orchestrateSequence + 1,
+ chainContainer = chainContainer,
+ orchestrateListener = nextJoinOrchestrateListener,
+ rollbackOrchestrateListener = nextRollbackOrchestrateListener,
+ beforeDefaultOrchestrateChain = this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
@@ -173,7 +179,8 @@ internal class DefaultOrchestrateChain privat
orchestrateCommand = OrchestrateCommand(commandType, chainContainer.codec, function),
resultHolder = chainContainer.resultHolder,
requestHolder = chainContainer.requestHolder,
- function.reified(),
+ typeReference = function.reified(),
+ group = group,
)
private fun getRollbackOrchestrateListener(
@@ -189,6 +196,7 @@ internal class DefaultOrchestrateChain privat
requestHolder = chainContainer.requestHolder,
resultHolder = chainContainer.resultHolder,
typeReference = it.reified(),
+ group = group,
)
}
@@ -197,12 +205,13 @@ internal class DefaultOrchestrateChain privat
): Orchestrator {
return chainContainer.orchestratorCache.cache(orchestratorId) {
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
- orchestratorId,
- orchestrateSequence + 1,
- chainContainer,
- nextCommitOrchestrateListener,
- null,
- this,
+ orchestratorId = orchestratorId,
+ orchestrateSequence = orchestrateSequence + 1,
+ chainContainer = chainContainer,
+ orchestrateListener = nextCommitOrchestrateListener,
+ rollbackOrchestrateListener = null,
+ beforeDefaultOrchestrateChain = this,
+ group = group,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
val firstOrchestrators = nextDefaultOrchestrateChain.initOrchestrateListeners()
@@ -241,12 +250,13 @@ internal class DefaultOrchestrateChain privat
): Orchestrator {
return chainContainer.orchestratorCache.cache(orchestratorId) {
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
- orchestratorId,
- orchestrateSequence + 1,
- chainContainer,
- nextJoinOrchestrateListener,
- null,
- this,
+ group = group,
+ orchestratorId = orchestratorId,
+ orchestrateSequence = orchestrateSequence + 1,
+ chainContainer = chainContainer,
+ orchestrateListener = nextJoinOrchestrateListener,
+ rollbackOrchestrateListener = null,
+ beforeDefaultOrchestrateChain = this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
@@ -385,6 +395,7 @@ internal class DefaultOrchestrateChain privat
resultHolder = chainContainer.resultHolder,
requestHolder = chainContainer.requestHolder,
typeReference = function.reified(),
+ group = group,
)
private fun getMonoRollbackOrchestrateListener(
@@ -404,10 +415,12 @@ internal class DefaultOrchestrateChain privat
requestHolder = chainContainer.requestHolder,
resultHolder = chainContainer.resultHolder,
typeReference = it.reified(),
+ group = group,
)
}
internal class Pre internal constructor(
+ private val group: String,
private val orchestratorId: String,
private val sagaManager: SagaManager,
private val sagaDispatcher: AbstractSagaDispatcher,
@@ -427,6 +440,7 @@ internal class DefaultOrchestrateChain privat
getRollbackOrchestrateListener(CommandType.DEFAULT, rollback)
return DefaultOrchestrateChain(
+ group = group,
orchestratorId = orchestratorId,
orchestrateSequence = 0,
chainContainer = getStreamContainer(),
@@ -445,6 +459,7 @@ internal class DefaultOrchestrateChain privat
getRollbackOrchestrateListener(CommandType.CONTEXT, contextRollback)
return DefaultOrchestrateChain(
+ group = group,
orchestratorId = orchestratorId,
orchestrateSequence = 0,
chainContainer = getStreamContainer(),
@@ -469,6 +484,7 @@ internal class DefaultOrchestrateChain privat
requestHolder = requestHolder,
resultHolder = resultHolder,
typeReference = function.reified(),
+ group = group,
)
private fun getRollbackOrchestrateListener(
@@ -487,7 +503,8 @@ internal class DefaultOrchestrateChain privat
),
requestHolder = requestHolder,
resultHolder = resultHolder,
- typeReference = it.reified()
+ typeReference = it.reified(),
+ group = group,
)
}
@@ -501,6 +518,7 @@ internal class DefaultOrchestrateChain privat
getMonoRollbackOrchestrateListener(CommandType.DEFAULT, rollback)
return DefaultOrchestrateChain(
+ group = group,
orchestratorId = orchestratorId,
orchestrateSequence = 0,
chainContainer = getStreamContainer(),
@@ -519,6 +537,7 @@ internal class DefaultOrchestrateChain privat
getMonoRollbackOrchestrateListener(CommandType.CONTEXT, contextRollback)
return DefaultOrchestrateChain(
+ group = group,
orchestratorId = orchestratorId,
orchestrateSequence = 0,
chainContainer = getStreamContainer(),
@@ -543,6 +562,7 @@ internal class DefaultOrchestrateChain privat
requestHolder = requestHolder,
resultHolder = resultHolder,
typeReference = function.reified(),
+ group = group,
)
private fun getMonoRollbackOrchestrateListener(
@@ -562,6 +582,7 @@ internal class DefaultOrchestrateChain privat
requestHolder = requestHolder,
resultHolder = resultHolder,
typeReference = it.reified(),
+ group = group,
)
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt
index c3d8bdc..7e0d51e 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt
@@ -5,6 +5,7 @@ import org.rooftop.netx.api.OrchestratorFactory
import org.rooftop.netx.core.Codec
internal class OrchestratorFactory internal constructor(
+ private val group: String,
private val sagaManager: SagaManager,
private val sagaDispatcher: AbstractSagaDispatcher,
private val codec: Codec,
@@ -26,6 +27,7 @@ internal class OrchestratorFactory internal constructor(
resultHolder = resultHolder,
requestHolder = requestHolder,
orchestratorCache = orchestratorCache,
+ group = group,
)
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt
index 860dd37..5b4777e 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt
@@ -20,6 +20,7 @@ internal abstract class AbstractOrchestrateListener internal c
private val requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
private val typeReference: TypeReference?,
+ private val group: String,
) {
var isFirst: Boolean = true
@@ -58,7 +59,7 @@ internal abstract class AbstractOrchestrateListener internal c
protected fun orchestrate(sagaEvent: SagaEvent): Mono {
return sagaEvent.startWithOrchestrateEvent()
.filter {
- it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId
+ it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId && sagaEvent.group == group
}
.mapReifiedRequest()
.flatMap { (request, event) ->
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt
index f0e2a7e..70a380e 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt
@@ -16,6 +16,7 @@ internal class CommitOrchestrateListener internal constructor(
private val resultHolder: ResultHolder,
requestHolder: RequestHolder,
typeReference: TypeReference?,
+ private val group: String,
) : AbstractOrchestrateListener(
orchestratorId,
orchestrateSequence,
@@ -24,12 +25,13 @@ internal class CommitOrchestrateListener internal constructor(
requestHolder,
resultHolder,
typeReference,
+ group
) {
@SagaCommitListener(OrchestrateEvent::class)
fun listenCommitOrchestrateEvent(sagaCommitEvent: SagaCommitEvent): Mono {
return sagaCommitEvent.startWithOrchestrateEvent()
- .filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId }
+ .filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId && sagaCommitEvent.group == this.group }
.mapReifiedRequest()
.flatMap { (request, event) ->
holdRequestIfRollbackable(request, sagaCommitEvent.id)
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt
index 1f37c61..47bde6d 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt
@@ -16,6 +16,7 @@ internal class JoinOrchestrateListener(
private val requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
private val typeReference: TypeReference?,
+ private val group: String,
) : AbstractOrchestrateListener(
orchestratorId,
orchestrateSequence,
@@ -24,6 +25,7 @@ internal class JoinOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
override fun withAnnotated(): AbstractOrchestrateListener {
@@ -43,6 +45,7 @@ internal class JoinOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
@SagaJoinListener(
event = OrchestrateEvent::class,
@@ -67,6 +70,7 @@ internal class JoinOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
@SagaJoinListener(
event = OrchestrateEvent::class,
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt
index 664d930..60c15f5 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt
@@ -16,6 +16,7 @@ internal class MonoCommitOrchestrateListener internal construc
requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
typeReference: TypeReference?,
+ private val group: String,
) : AbstractOrchestrateListener(
orchestratorId,
orchestrateSequence,
@@ -24,11 +25,12 @@ internal class MonoCommitOrchestrateListener internal construc
requestHolder,
resultHolder,
typeReference,
+ group,
) {
@SagaCommitListener(OrchestrateEvent::class)
fun listenCommitOrchestrateEvent(sagaCommitEvent: SagaCommitEvent): Mono {
return sagaCommitEvent.startWithOrchestrateEvent()
- .filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId }
+ .filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId && sagaCommitEvent.group == this.group}
.mapReifiedRequest()
.flatMap { (request, event) ->
holdRequestIfRollbackable(request, sagaCommitEvent.id)
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt
index 8a7c5df..dbc09d7 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt
@@ -16,6 +16,7 @@ internal class MonoJoinOrchestrateListener(
private val requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
private val typeReference: TypeReference?,
+ private val group: String,
) : AbstractOrchestrateListener(
orchestratorId,
orchestrateSequence,
@@ -24,6 +25,7 @@ internal class MonoJoinOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
override fun withAnnotated(): AbstractOrchestrateListener {
@@ -43,6 +45,7 @@ internal class MonoJoinOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
@SagaJoinListener(
event = OrchestrateEvent::class,
@@ -67,6 +70,7 @@ internal class MonoJoinOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
@SagaJoinListener(
event = OrchestrateEvent::class,
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt
index 82d5f95..9d0eb97 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt
@@ -16,6 +16,7 @@ internal class MonoRollbackOrchestrateListener(
requestHolder: RequestHolder,
resultHolder: ResultHolder,
typeReference: TypeReference?,
+ private val group: String,
) : AbstractOrchestrateListener(
orchestratorId,
orchestrateSequence,
@@ -24,12 +25,13 @@ internal class MonoRollbackOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
@SagaRollbackListener(OrchestrateEvent::class)
fun listenRollbackOrchestrateEvent(sagaRollbackEvent: SagaRollbackEvent): Mono {
return sagaRollbackEvent.startWithOrchestrateEvent()
- .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence }
+ .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence && sagaRollbackEvent.group == this.group }
.getHeldRequest(sagaRollbackEvent)
.flatMap { (request, event) ->
monoRollbackCommand.command(request, event.context)
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt
index fbeb643..9892738 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt
@@ -16,6 +16,7 @@ internal class MonoStartOrchestrateListener(
private val requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
private val typeReference: TypeReference?,
+ private val group: String,
) : AbstractOrchestrateListener(
orchestratorId,
orchestrateSequence,
@@ -24,6 +25,7 @@ internal class MonoStartOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
override fun withAnnotated(): AbstractOrchestrateListener {
@@ -43,6 +45,7 @@ internal class MonoStartOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
@SagaStartListener(
event = OrchestrateEvent::class,
@@ -67,6 +70,7 @@ internal class MonoStartOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
@SagaStartListener(
event = OrchestrateEvent::class,
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt
index bc1c41e..6e18941 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt
@@ -16,6 +16,7 @@ internal class RollbackOrchestrateListener(
requestHolder: RequestHolder,
resultHolder: ResultHolder,
typeReference: TypeReference?,
+ private val group: String,
) : AbstractOrchestrateListener(
orchestratorId,
orchestrateSequence,
@@ -24,12 +25,13 @@ internal class RollbackOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
@SagaRollbackListener(OrchestrateEvent::class)
fun listenRollbackOrchestrateEvent(sagaRollbackEvent: SagaRollbackEvent): Mono {
return sagaRollbackEvent.startWithOrchestrateEvent()
- .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence }
+ .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence && sagaRollbackEvent.group == group }
.getHeldRequest(sagaRollbackEvent)
.map { (request, event) ->
rollbackCommand.command(request, event.context)
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt
index 95b47f7..463bafd 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt
@@ -16,6 +16,7 @@ internal class StartOrchestrateListener(
private val requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
private val typeReference: TypeReference?,
+ private val group: String,
) : AbstractOrchestrateListener(
orchestratorId,
orchestrateSequence,
@@ -24,6 +25,7 @@ internal class StartOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
override fun withAnnotated(): AbstractOrchestrateListener {
@@ -43,6 +45,7 @@ internal class StartOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
@SagaStartListener(
event = OrchestrateEvent::class,
@@ -67,6 +70,7 @@ internal class StartOrchestrateListener(
requestHolder,
resultHolder,
typeReference,
+ group,
) {
@SagaStartListener(
event = OrchestrateEvent::class,
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt
index 3072990..0f4422a 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisSagaConfigurer.kt
@@ -51,6 +51,7 @@ class RedisSagaConfigurer(
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
internal fun redisStreamOrchestratorFactory(): OrchestratorFactory = OrchestratorFactory(
+ group = nodeGroup,
sagaManager = redisStreamSagaManager(),
sagaDispatcher = redisStreamSagaDispatcher(),
codec = jsonCodec(),