Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1438,11 +1438,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
throw new MQClientException("tranExecutor is null", null);
}

// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}

ensureNotDelayedForTransactional(msg);
Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;
Expand Down Expand Up @@ -1499,7 +1495,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
try {
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
log.warn("local transaction execute {}, but end broker transaction failed", localTransactionState, e);
}

TransactionSendResult transactionSendResult = new TransactionSendResult();
Expand All @@ -1512,6 +1508,15 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
return transactionSendResult;
}

private void ensureNotDelayedForTransactional(final Message msg) throws MQClientException {
if (msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null
|| msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) != null
|| msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null
|| msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null) {
throw new MQClientException("Transactional messages do not support delayed delivery", null);
}
}

/**
* DEFAULT SYNC -------------------------------------------------------
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ public Object answer(InvocationOnMock mock) throws Throwable {
assertThat(ctx.getMessage().getTopic()).isEqualTo(topic);
}

@Test(expected = MQClientException.class)
public void testSendMessageInTransaction_NoListener_ThrowsException() throws MQClientException {
producer.setTransactionListener(null);
producer.sendMessageInTransaction(message, null);
}

@Test(expected = MQClientException.class)
public void testSendMessageInTransaction_DelayMsg_ThrowsException() throws MQClientException {
message.setDelayTimeLevel(3);
producer.sendMessageInTransaction(message, null);
}

@After
public void terminate() {
producer.shutdown();
Expand Down
Loading