Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 63 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<br>

![version 0.4.8](https://img.shields.io/badge/version-0.4.8-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![version 0.4.9](https://img.shields.io/badge/version-0.4.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)

**TPS(6,000)** on my Macbook air m2(default options). _[link](#Test1-TPS)_
Expand All @@ -16,12 +16,28 @@ Netx is a Saga framework, that provides following features.
3. Supports both Orchestration and Choreograph.
4. Automatically reruns loss events.
5. Automatically applies **`Transactional messaging pattern`**.
6. Supports backpressure to control the number of events that can be processed per node.
7. Prevents multiple nodes in the same group from receiving duplicate events.
8. Ensures message delivery using the `At Least Once` approach.
6. Supports **`Rollback Dead letter`** relay. If an exception occurs during the rollback process, saga is stored in the Dead Letter Queue, and you can relay it using DeadLetterRelay.
7. Supports backpressure to control the number of events that can be processed per node.
8. Prevents multiple nodes in the same group from receiving duplicate events.
9. Ensures message delivery using the `At Least Once` approach.

You can see the test results [here](#Test).

## Table of Contents
- [Download](#download)
- [How to use](#how-to-use)
- [Orchestrator-example.](#orchestrator-example)
- [Events-Example. Handle saga event](#events-example-handle-saga-event)
- [Events-Example. Start pay saga](#events-example-start-pay-saga)
- [Events-Example. Join order saga](#events-example-join-order-saga)
- [Events-Example. Check exists saga](#events-example-check-exists-saga)
- [Rollback DeadLetter](#rollback-deadletter)
- [Example. relay deadLetter](#example-relay-deadletter)
- [Example. handle deadLetter message](#example-handle-deadletter-message)
- [Test](#test)
- [Test1-TPS](#test1-tps)
- [Test2-Rollback](#test2-rollback)

## Download

```groovy
Expand Down Expand Up @@ -274,6 +290,49 @@ fun exists(param: Any): Mono<Any> {
}
```

### Rollback DeadLetter

#### Example. relay deadLetter

```kotlin

@Component
class SomeClass(
private val deadLetterRelay: DeadLetterRelay,
) {

fun example() {
// Relay latest dead letter
deadLetterRelay.relay()
.subscribe()

// Alternatively, you can use the …Sync method in a synchronous environment.
deadLetterRelay.relaySync()

// Relay specific dead letter by deadLetterId
deadLetterRelay.relaySync("12345-01")
}
}

```

#### Example. handle deadLetter message

```kotlin

@Configuration
class SomeClass(
private val deadLetterRegistry: DeadLetterRegistry,
) {

fun example() {
deadLetterRegistry.addListener { deadLetterId, sagaEvent ->
// do handle
}
}
}
```

## Test

### Test1-TPS
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ kotlin.code.style=official

### Project ###
group=org.rooftopmsa
version=0.4.8
version=0.4.9
compatibility=17

### Sonarcloud ###
Expand Down
37 changes: 37 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/DeadLetterRelay.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.rooftop.netx.api

import reactor.core.publisher.Mono

/**
* An interface for relay dead letters, with support for recovering those that failed during rollback.
*/
interface DeadLetterRelay {

/**
* relay dead letter and return SagaEvent
*
* @return SagaEvent | A dead letter that was successfully processed
*/
fun relay(): Mono<SagaEvent>

/**
* @see relay
* @return SagaEvent | A dead letter that was successfully processed
*/
fun relaySync(): SagaEvent


/**
* relay dead letter and return SagaEvent by specific deadLetterId (not a same SagaEvent.id)
*
* @param deadLetterId
* @return SagaEvent | A dead letter that was successfully processed
*/
fun relay(deadLetterId: String): Mono<SagaEvent>

/**
* @see relay
* @return SagaEvent | A dead letter that was successfully processed (not a same SagaEvent.id)
*/
fun relaySync(deadLetterId: String): SagaEvent
}
4 changes: 4 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/Exceptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,9 @@ class FailedAckSagaException(message: String) : RuntimeException(message)
class ResultTimeoutException(message: String, throwable: Throwable) :
RuntimeException(message, throwable)

class DeadLetterTimeoutException(message: String): RuntimeException(message)

class DeadLetterException(message: String): RuntimeException(message)

@JsonIgnoreProperties(ignoreUnknown = true)
class ResultException(message: String) : RuntimeException(message)
23 changes: 23 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/SagaEvent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,28 @@ sealed class SagaEvent(
type
)

/**
* If you use Orchestrator and get SagaEvent that hold orchestrateEvent.
* you can get type class by using this function.
*
* @param type
* @param T
*/
fun <T: Any> decodeOrchestrateEvent(type: Class<T>): T = decodeOrchestrateEvent(type.kotlin)

/**
* @see decodeOrchestrateEvent
*/
fun <T : Any> decodeOrchestrateEvent(type: KClass<T>): T {
val orchestrateEvent = codec.decode(
event ?: throw NullPointerException("Cannot decode event cause event is null"),
object : TypeReference<Map<String, Any>>() {}
)
val clientEvent = orchestrateEvent["clientEvent"]
?: throw NullPointerException("Cannot decode event cause orchestrateEvent.clientEvent is null")

return codec.decode(clientEvent as String, type)
}

internal abstract fun copy(): SagaEvent
}
67 changes: 14 additions & 53 deletions src/main/kotlin/org/rooftop/netx/engine/AbstractSagaDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.rooftop.netx.api.*
import org.rooftop.netx.core.Codec
import org.rooftop.netx.engine.core.Saga
import org.rooftop.netx.engine.core.SagaState
import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager
import org.rooftop.netx.engine.logging.info
import org.rooftop.netx.engine.logging.warningOnError
import reactor.core.publisher.Flux
Expand All @@ -17,8 +18,13 @@ import kotlin.reflect.full.declaredMemberFunctions
internal abstract class AbstractSagaDispatcher(
private val codec: Codec,
private val sagaManager: SagaManager,
private val abstractDeadLetterManager: AbstractDeadLetterManager,
) {

init {
this.also { abstractDeadLetterManager.dispatcher = it }
}

private val functions =
mutableMapOf<SagaState, MutableList<AbstractDispatchFunction<*>>>()

Expand Down Expand Up @@ -55,7 +61,8 @@ internal abstract class AbstractSagaDispatcher(
messageId: String
): Flux<*> = this.doOnComplete {
Mono.just(saga to messageId)
.info("Ack saga \"${saga.id}\"")
.filter { messageId != DEAD_LETTER }
.info("Ack saga \"${saga.id}\" messageId: $messageId")
.flatMap {
ack(saga, messageId)
.warningOnError("Fail to ack saga \"${saga.id}\"")
Expand All @@ -65,57 +72,7 @@ internal abstract class AbstractSagaDispatcher(
}

private fun mapSagaEvent(saga: Saga): Mono<SagaEvent> {
return when (saga.state) {
SagaState.START -> Mono.just(
SagaStartEvent(
id = saga.id,
nodeName = saga.serverId,
group = saga.group,
event = extractEvent(saga),
codec = codec,
)
)

SagaState.COMMIT -> Mono.just(
SagaCommitEvent(
id = saga.id,
nodeName = saga.serverId,
group = saga.group,
event = extractEvent(saga),
codec = codec
)
)

SagaState.JOIN -> Mono.just(
SagaJoinEvent(
id = saga.id,
nodeName = saga.serverId,
group = saga.group,
event = extractEvent(saga),
codec = codec,
)
)

SagaState.ROLLBACK ->
Mono.just(
SagaRollbackEvent(
id = saga.id,
nodeName = saga.serverId,
group = saga.group,
event = extractEvent(saga),
cause = saga.cause
?: throw NullPointerException("Null value on SagaRollbackEvent's cause field"),
codec = codec,
)
)
}
}

private fun extractEvent(saga: Saga): String? {
return when (saga.event != null) {
true -> saga.event
false -> null
}
return Mono.just(saga.toEvent(codec))
}

internal fun addOrchestrate(handler: Any) {
Expand Down Expand Up @@ -145,6 +102,7 @@ internal abstract class AbstractSagaDispatcher(
noRollbackFor,
nextState,
sagaManager,
abstractDeadLetterManager,
)
)
}.onFailure {
Expand Down Expand Up @@ -190,6 +148,7 @@ internal abstract class AbstractSagaDispatcher(
noRollbackFor,
nextState,
sagaManager,
abstractDeadLetterManager,
)
)
}.onFailure {
Expand Down Expand Up @@ -225,6 +184,7 @@ internal abstract class AbstractSagaDispatcher(
noRollbackFor,
nextState,
sagaManager,
abstractDeadLetterManager,
)
)
}.onFailure {
Expand Down Expand Up @@ -288,7 +248,8 @@ internal abstract class AbstractSagaDispatcher(
messageId: String
): Mono<Pair<Saga, String>>

private companion object {
internal companion object {
internal const val DEAD_LETTER = "dead letter"
private const val DISPATCHED = "dispatched"

private val notMatchedSagaHandlerException =
Expand Down
14 changes: 13 additions & 1 deletion src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package org.rooftop.netx.engine

import org.rooftop.netx.api.SagaEvent
import org.rooftop.netx.api.SagaManager
import org.rooftop.netx.api.SagaRollbackEvent
import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager
import org.rooftop.netx.engine.logging.info
import reactor.core.publisher.Mono
import kotlin.reflect.KClass
Expand All @@ -20,6 +22,7 @@ internal class MonoDispatchFunction(
noRetryFor: Array<KClass<out Throwable>>,
nextState: NextSagaState,
sagaManager: SagaManager,
private val abstractDeadLetterManager: AbstractDeadLetterManager,
) : AbstractDispatchFunction<Mono<*>>(
eventType,
function,
Expand All @@ -33,14 +36,23 @@ internal class MonoDispatchFunction(
return Mono.just(sagaEvent)
.filter { isProcessable(sagaEvent) }
.map { sagaEvent.copy() }
.flatMap { function.call(handler, sagaEvent) }
.flatMap {
function.call(handler, sagaEvent)
}
.info("Call Mono SagaHandler \"${name()}\" with id \"${sagaEvent.id}\"")
.switchIfEmpty(`continue`)
.doOnNext {
if (isProcessable(sagaEvent)) {
publishNextSaga(sagaEvent)
}
}
.onErrorResume {
if (sagaEvent is SagaRollbackEvent) {
abstractDeadLetterManager.addDeadLetter(sagaEvent)
} else {
Mono.error(it)
}
}
.onErrorResume {
if (isNoRollbackFor(it)) {
return@onErrorResume noRollbackFor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package org.rooftop.netx.engine

import org.rooftop.netx.api.SagaEvent
import org.rooftop.netx.api.SagaManager
import org.rooftop.netx.api.SagaRollbackEvent
import org.rooftop.netx.engine.deadletter.AbstractDeadLetterManager
import org.rooftop.netx.engine.logging.info
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import kotlin.reflect.KClass
import kotlin.reflect.KFunction

Expand All @@ -18,6 +21,7 @@ internal class NotPublishDispatchFunction(
noRollbackFor: Array<KClass<out Throwable>>,
nextState: NextSagaState,
sagaManager: SagaManager,
private val abstractDeadLetterManager: AbstractDeadLetterManager,
) : AbstractDispatchFunction<Any?>(
eventType,
function,
Expand All @@ -30,11 +34,20 @@ internal class NotPublishDispatchFunction(
override fun call(sagaEvent: SagaEvent): Any {
if (isProcessable(sagaEvent)) {
return runCatching {
function.call(handler, sagaEvent)
info("Call NotPublisher SagaHandler \"${name()}\" with id \"${sagaEvent.id}\"")
val result = function.call(handler, sagaEvent)
info("Call NotPublisher SagaHandler success \\\"${name()}\\\" with id \\\"${sagaEvent.id}\\\"\"")
result
}.fold(
onSuccess = { publishNextSaga(sagaEvent) },
onFailure = {
if (sagaEvent is SagaRollbackEvent) {
abstractDeadLetterManager.addDeadLetter(sagaEvent)
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
return@fold SKIP
}

if (isNoRollbackFor(it)) {
return@fold NO_ROLLBACK_FOR
}
Expand Down
Loading
Loading