Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<br>

![version 0.4.9](https://img.shields.io/badge/version-0.4.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![version 0.5.0](https://img.shields.io/badge/version-0.4.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)

**TPS(6,000)** on my Macbook air m2(default options). _[link](#Test1-TPS)_
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ kotlin.code.style=official

### Project ###
group=org.rooftopmsa
version=0.4.9
version=0.5.0
compatibility=17

### Sonarcloud ###
Expand Down
87 changes: 54 additions & 33 deletions src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.rooftop.netx.engine.listen.*
import reactor.core.publisher.Mono

internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> private constructor(
private val group: String,
private val orchestratorId: String,
private val orchestrateSequence: Int,
private val chainContainer: ChainContainer,
Expand Down Expand Up @@ -56,19 +57,21 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
requestHolder = chainContainer.requestHolder,
resultHolder = chainContainer.resultHolder,
typeReference = function.reified(),
group = group,
)

private fun <S : Any> nextOrchestrateChain(
nextJoinOrchestrateListener: JoinOrchestrateListener<V, S>,
nextRollbackOrchestrateListener: RollbackOrchestrateListener<V, S>?
): OrchestrateChain<OriginReq, V, S> {
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextJoinOrchestrateListener,
nextRollbackOrchestrateListener,
this,
group = group,
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
chainContainer = chainContainer,
orchestrateListener = nextJoinOrchestrateListener,
rollbackOrchestrateListener = nextRollbackOrchestrateListener,
beforeDefaultOrchestrateChain = this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain

Expand Down Expand Up @@ -97,12 +100,13 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
getMonoRollbackOrchestrateListener<V, S>(CommandType.CONTEXT, contextRollback)

val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextJoinOrchestrateListener,
nextRollbackOrchestrateListener,
this,
group = group,
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
chainContainer = chainContainer,
orchestrateListener = nextJoinOrchestrateListener,
rollbackOrchestrateListener = nextRollbackOrchestrateListener,
beforeDefaultOrchestrateChain = this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain

Expand All @@ -124,20 +128,22 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
),
requestHolder = chainContainer.requestHolder,
resultHolder = chainContainer.resultHolder,
function.reified(),
typeReference = function.reified(),
group = group,
)

private fun <S : Any> nextOrchestrateChain(
nextJoinOrchestrateListener: MonoJoinOrchestrateListener<V, S>,
nextRollbackOrchestrateListener: MonoRollbackOrchestrateListener<V, S>?
): OrchestrateChain<OriginReq, V, S> {
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextJoinOrchestrateListener,
nextRollbackOrchestrateListener,
this,
group = group,
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
chainContainer = chainContainer,
orchestrateListener = nextJoinOrchestrateListener,
rollbackOrchestrateListener = nextRollbackOrchestrateListener,
beforeDefaultOrchestrateChain = this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain

Expand Down Expand Up @@ -173,7 +179,8 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
orchestrateCommand = OrchestrateCommand<T, V>(commandType, chainContainer.codec, function),
resultHolder = chainContainer.resultHolder,
requestHolder = chainContainer.requestHolder,
function.reified(),
typeReference = function.reified(),
group = group,
)

private fun <T : Any, V : Any> getRollbackOrchestrateListener(
Expand All @@ -189,6 +196,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
requestHolder = chainContainer.requestHolder,
resultHolder = chainContainer.resultHolder,
typeReference = it.reified(),
group = group,
)
}

Expand All @@ -197,12 +205,13 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
): Orchestrator<OriginReq, S> {
return chainContainer.orchestratorCache.cache(orchestratorId) {
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextCommitOrchestrateListener,
null,
this,
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
chainContainer = chainContainer,
orchestrateListener = nextCommitOrchestrateListener,
rollbackOrchestrateListener = null,
beforeDefaultOrchestrateChain = this,
group = group,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
val firstOrchestrators = nextDefaultOrchestrateChain.initOrchestrateListeners()
Expand Down Expand Up @@ -241,12 +250,13 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
): Orchestrator<OriginReq, S> {
return chainContainer.orchestratorCache.cache(orchestratorId) {
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextJoinOrchestrateListener,
null,
this,
group = group,
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
chainContainer = chainContainer,
orchestrateListener = nextJoinOrchestrateListener,
rollbackOrchestrateListener = null,
beforeDefaultOrchestrateChain = this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain

Expand Down Expand Up @@ -385,6 +395,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
resultHolder = chainContainer.resultHolder,
requestHolder = chainContainer.requestHolder,
typeReference = function.reified(),
group = group,
)

private fun <T : Any, V : Any> getMonoRollbackOrchestrateListener(
Expand All @@ -404,10 +415,12 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
requestHolder = chainContainer.requestHolder,
resultHolder = chainContainer.resultHolder,
typeReference = it.reified(),
group = group,
)
}

internal class Pre<T : Any> internal constructor(
private val group: String,
private val orchestratorId: String,
private val sagaManager: SagaManager,
private val sagaDispatcher: AbstractSagaDispatcher,
Expand All @@ -427,6 +440,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
getRollbackOrchestrateListener<V>(CommandType.DEFAULT, rollback)

return DefaultOrchestrateChain(
group = group,
orchestratorId = orchestratorId,
orchestrateSequence = 0,
chainContainer = getStreamContainer(),
Expand All @@ -445,6 +459,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
getRollbackOrchestrateListener<V>(CommandType.CONTEXT, contextRollback)

return DefaultOrchestrateChain(
group = group,
orchestratorId = orchestratorId,
orchestrateSequence = 0,
chainContainer = getStreamContainer(),
Expand All @@ -469,6 +484,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
requestHolder = requestHolder,
resultHolder = resultHolder,
typeReference = function.reified(),
group = group,
)

private fun <V : Any> getRollbackOrchestrateListener(
Expand All @@ -487,7 +503,8 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
),
requestHolder = requestHolder,
resultHolder = resultHolder,
typeReference = it.reified()
typeReference = it.reified(),
group = group,
)
}

Expand All @@ -501,6 +518,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
getMonoRollbackOrchestrateListener<V>(CommandType.DEFAULT, rollback)

return DefaultOrchestrateChain(
group = group,
orchestratorId = orchestratorId,
orchestrateSequence = 0,
chainContainer = getStreamContainer(),
Expand All @@ -519,6 +537,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
getMonoRollbackOrchestrateListener<V>(CommandType.CONTEXT, contextRollback)

return DefaultOrchestrateChain(
group = group,
orchestratorId = orchestratorId,
orchestrateSequence = 0,
chainContainer = getStreamContainer(),
Expand All @@ -543,6 +562,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
requestHolder = requestHolder,
resultHolder = resultHolder,
typeReference = function.reified(),
group = group,
)

private fun <V : Any> getMonoRollbackOrchestrateListener(
Expand All @@ -562,6 +582,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
requestHolder = requestHolder,
resultHolder = resultHolder,
typeReference = it.reified(),
group = group,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.rooftop.netx.api.OrchestratorFactory
import org.rooftop.netx.core.Codec

internal class OrchestratorFactory internal constructor(
private val group: String,
private val sagaManager: SagaManager,
private val sagaDispatcher: AbstractSagaDispatcher,
private val codec: Codec,
Expand All @@ -26,6 +27,7 @@ internal class OrchestratorFactory internal constructor(
resultHolder = resultHolder,
requestHolder = requestHolder,
orchestratorCache = orchestratorCache,
group = group,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ internal abstract class AbstractOrchestrateListener<T : Any, V : Any> internal c
private val requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
private val typeReference: TypeReference<T>?,
private val group: String,
) {

var isFirst: Boolean = true
Expand Down Expand Up @@ -58,7 +59,7 @@ internal abstract class AbstractOrchestrateListener<T : Any, V : Any> internal c
protected fun orchestrate(sagaEvent: SagaEvent): Mono<OrchestrateEvent> {
return sagaEvent.startWithOrchestrateEvent()
.filter {
it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId
it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId && sagaEvent.group == group
}
.mapReifiedRequest()
.flatMap { (request, event) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class CommitOrchestrateListener<T : Any, V : Any> internal constructor(
private val resultHolder: ResultHolder,
requestHolder: RequestHolder,
typeReference: TypeReference<T>?,
private val group: String,
) : AbstractOrchestrateListener<T, V>(
orchestratorId,
orchestrateSequence,
Expand All @@ -24,12 +25,13 @@ internal class CommitOrchestrateListener<T : Any, V : Any> internal constructor(
requestHolder,
resultHolder,
typeReference,
group
) {

@SagaCommitListener(OrchestrateEvent::class)
fun listenCommitOrchestrateEvent(sagaCommitEvent: SagaCommitEvent): Mono<V> {
return sagaCommitEvent.startWithOrchestrateEvent()
.filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId }
.filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId && sagaCommitEvent.group == this.group }
.mapReifiedRequest()
.flatMap { (request, event) ->
holdRequestIfRollbackable(request, sagaCommitEvent.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class JoinOrchestrateListener<T : Any, V : Any>(
private val requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
private val typeReference: TypeReference<T>?,
private val group: String,
) : AbstractOrchestrateListener<T, V>(
orchestratorId,
orchestrateSequence,
Expand All @@ -24,6 +25,7 @@ internal class JoinOrchestrateListener<T : Any, V : Any>(
requestHolder,
resultHolder,
typeReference,
group,
) {

override fun withAnnotated(): AbstractOrchestrateListener<T, V> {
Expand All @@ -43,6 +45,7 @@ internal class JoinOrchestrateListener<T : Any, V : Any>(
requestHolder,
resultHolder,
typeReference,
group,
) {
@SagaJoinListener(
event = OrchestrateEvent::class,
Expand All @@ -67,6 +70,7 @@ internal class JoinOrchestrateListener<T : Any, V : Any>(
requestHolder,
resultHolder,
typeReference,
group,
) {
@SagaJoinListener(
event = OrchestrateEvent::class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class MonoCommitOrchestrateListener<T : Any, V : Any> internal construc
requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
typeReference: TypeReference<T>?,
private val group: String,
) : AbstractOrchestrateListener<T, V>(
orchestratorId,
orchestrateSequence,
Expand All @@ -24,11 +25,12 @@ internal class MonoCommitOrchestrateListener<T : Any, V : Any> internal construc
requestHolder,
resultHolder,
typeReference,
group,
) {
@SagaCommitListener(OrchestrateEvent::class)
fun listenCommitOrchestrateEvent(sagaCommitEvent: SagaCommitEvent): Mono<V> {
return sagaCommitEvent.startWithOrchestrateEvent()
.filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId }
.filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId && sagaCommitEvent.group == this.group}
.mapReifiedRequest()
.flatMap { (request, event) ->
holdRequestIfRollbackable(request, sagaCommitEvent.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class MonoJoinOrchestrateListener<T : Any, V : Any>(
private val requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
private val typeReference: TypeReference<T>?,
private val group: String,
) : AbstractOrchestrateListener<T, V>(
orchestratorId,
orchestrateSequence,
Expand All @@ -24,6 +25,7 @@ internal class MonoJoinOrchestrateListener<T : Any, V : Any>(
requestHolder,
resultHolder,
typeReference,
group,
) {

override fun withAnnotated(): AbstractOrchestrateListener<T, V> {
Expand All @@ -43,6 +45,7 @@ internal class MonoJoinOrchestrateListener<T : Any, V : Any>(
requestHolder,
resultHolder,
typeReference,
group,
) {
@SagaJoinListener(
event = OrchestrateEvent::class,
Expand All @@ -67,6 +70,7 @@ internal class MonoJoinOrchestrateListener<T : Any, V : Any>(
requestHolder,
resultHolder,
typeReference,
group,
) {
@SagaJoinListener(
event = OrchestrateEvent::class,
Expand Down
Loading
Loading