serializer) {
final RetryTemplate retryTemplate = getRetryTemplate();
-
- return retryTemplate.execute(context -> {
- Throwable throwable = null;
- try {
- final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
- .stream()
- .map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
- .filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
- if (keyQueryMetadata != null) {
- return keyQueryMetadata.activeHost();
+ try {
+ return retryTemplate.execute(() -> {
+ Throwable throwable = null;
+ try {
+ final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
+ .stream()
+ .map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
+ .filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
+ if (keyQueryMetadata != null) {
+ return keyQueryMetadata.activeHost();
+ }
+ }
+ catch (Exception e) {
+ throwable = e;
}
- }
- catch (Exception e) {
- throwable = e;
- }
- throw new IllegalStateException(
- "Error when retrieving state store.", throwable != null ? throwable : new Throwable("Kafka Streams is not ready."));
- });
+ throw new IllegalStateException(
+ "Error when retrieving state store.", throwable != null ? throwable : new Throwable("Kafka Streams is not ready."));
+ });
+ }
+ catch (RetryException ex) {
+ ReflectionUtils.rethrowRuntimeException(ex.getCause());
+ return null;
+ }
}
private RetryTemplate getRetryTemplate() {
- RetryTemplate retryTemplate = new RetryTemplate();
-
- KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
- RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
- FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
- backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());
- retryTemplate.setBackOffPolicy(backOffPolicy);
- retryTemplate.setRetryPolicy(retryPolicy);
+ var stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
+ RetryPolicy retryPolicy = RetryPolicy.builder()
+ .maxAttempts(stateStoreRetry.getMaxAttempts())
+ .delay(Duration.ofMillis(stateStoreRetry.getBackoffPeriod()))
+ .build();
- return retryTemplate;
+ return new RetryTemplate(retryPolicy);
}
/**
diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinder.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinder.java
index f2cc083e59..f7d626edb9 100644
--- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinder.java
+++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinder.java
@@ -44,7 +44,7 @@
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
-import org.springframework.retry.support.RetryTemplate;
+import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.StringUtils;
/**
diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinder.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinder.java
index 0b86227879..e635b98b18 100644
--- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinder.java
+++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinder.java
@@ -35,14 +35,14 @@
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
-import org.springframework.retry.support.RetryTemplate;
+import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.StringUtils;
/**
* {@link org.springframework.cloud.stream.binder.Binder} implementation for
* {@link KTable}. This implemenation extends from the {@link AbstractBinder} directly.
- *
+ *
* Provides only consumer binding for the bound KTable as output bindings are not allowed
* on it.
*
diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderUtils.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderUtils.java
index df2bdfaa5b..cdb5c7c41a 100644
--- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderUtils.java
+++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderUtils.java
@@ -58,7 +58,7 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
-import org.springframework.retry.support.RetryTemplate;
+import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java
index 31bf08696b..d1350e284f 100644
--- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java
+++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java
@@ -123,7 +123,7 @@ void stateStoreRetrievalRetriedOnFailure() {
catch (Exception ignored) {
}
- Mockito.verify(mockKafkaStreams, times(3))
+ Mockito.verify(mockKafkaStreams, times(4))
.store(StoreQueryParameters.fromNameAndType("foo", storeType));
}
@@ -153,7 +153,7 @@ void hostInfoRetrievalRetriedOnFailure() {
catch (Exception ignored) {
}
- Mockito.verify(mockKafkaStreams, times(3))
+ Mockito.verify(mockKafkaStreams, times(4))
.queryMetadataForKey("foo", "foobarApp-key", serializer);
}
diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsRetryTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsRetryTests.java
index e49b5f46df..9961e3bf52 100644
--- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsRetryTests.java
+++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsRetryTests.java
@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams.function;
+import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -38,17 +39,17 @@
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
+import org.springframework.core.retry.RetryException;
+import org.springframework.core.retry.RetryPolicy;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
-import org.springframework.retry.RetryPolicy;
-import org.springframework.retry.backoff.FixedBackOffPolicy;
-import org.springframework.retry.policy.SimpleRetryPolicy;
-import org.springframework.retry.support.RetryTemplate;
+import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.Assert;
+import org.springframework.util.ReflectionUtils;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@@ -159,10 +160,15 @@ public void init(org.apache.kafka.streams.processor.api.ProcessorContext
* You must also add {@link TestChannelBinderConfiguration} to your configuration. Below
* is the example using Spring Boot test.
@@ -101,6 +99,7 @@
*
* @author Oleg Zhurakousky
* @author Gary Russell
+ * @author Artem Bilan
*
*/
public class TestChannelBinder extends
@@ -196,7 +195,7 @@ protected MessageHandler getErrorMessageHandler(ConsumerDestination destination,
}
/**
- * Implementation of simple message listener container modeled after AMQP
+ * Implementation of a simple message listener container modeled after AMQP
* SimpleMessageListenerContainer.
*/
private static class IntegrationMessageListeningContainer implements MessageHandler {
@@ -215,18 +214,16 @@ public void setMessageListener(Consumer> listener) {
}
/**
- * Implementation of inbound channel adapter modeled after AmqpInboundChannelAdapter.
+ * Implementation of an inbound channel adapter modeled after AmqpInboundChannelAdapter.
*/
private static class IntegrationBinderInboundChannelAdapter
extends MessageProducerSupport {
- private static final ThreadLocal attributesHolder = new ThreadLocal();
-
private final IntegrationMessageListeningContainer listenerContainer;
private RetryTemplate retryTemplate;
- private RecoveryCallback extends Object> recoveryCallback;
+ private RecoveryCallback> recoveryCallback;
IntegrationBinderInboundChannelAdapter(
IntegrationMessageListeningContainer listenerContainer) {
@@ -234,8 +231,7 @@ private static class IntegrationBinderInboundChannelAdapter
}
// Temporarily unused until DLQ strategy for this binder becomes a requirement
- public void setRecoveryCallback(
- RecoveryCallback extends Object> recoveryCallback) {
+ public void setRecoveryCallback(RecoveryCallback> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}
@@ -252,33 +248,29 @@ protected void onInit() {
+ "send an error message when retries are exhausted");
}
Listener messageListener = new Listener();
- if (this.retryTemplate != null) {
- this.retryTemplate.registerListener(messageListener);
- }
this.listenerContainer.setMessageListener(messageListener);
}
- protected class Listener implements RetryListener, Consumer> {
+ protected class Listener implements Consumer> {
@Override
- @SuppressWarnings("unchecked")
public void accept(Message> message) {
try {
if (IntegrationBinderInboundChannelAdapter.this.retryTemplate == null) {
+ processMessage(message);
+ }
+ else {
try {
- processMessage(message);
+ IntegrationBinderInboundChannelAdapter.this.retryTemplate.execute(() -> {
+ processMessage(message);
+ return null;
+ });
}
- finally {
- attributesHolder.remove();
+ catch (RetryException ex) {
+ IntegrationBinderInboundChannelAdapter.this.recoveryCallback
+ .recover(ErrorMessageUtils.getAttributeAccessor(message, null), ex);
}
}
- else {
- IntegrationBinderInboundChannelAdapter.this.retryTemplate
- .execute(context -> {
- processMessage(message);
- return null;
- }, (RecoveryCallback) IntegrationBinderInboundChannelAdapter.this.recoveryCallback);
- }
}
catch (RuntimeException e) {
if (getErrorChannel() != null) {
@@ -298,27 +290,6 @@ private void processMessage(Message> message) {
sendMessage(message);
}
- @Override
- public boolean open(RetryContext context,
- RetryCallback callback) {
- if (IntegrationBinderInboundChannelAdapter.this.recoveryCallback != null) {
- attributesHolder.set(context);
- }
- return true;
- }
-
- @Override
- public void close(RetryContext context,
- RetryCallback callback, Throwable throwable) {
- attributesHolder.remove();
- }
-
- @Override
- public void onError(RetryContext context,
- RetryCallback callback, Throwable throwable) {
- // Empty
- }
-
}
}
diff --git a/core/spring-cloud-stream-test-support/src/main/java/org/springframework/cloud/stream/binder/AbstractBinderTests.java b/core/spring-cloud-stream-test-support/src/main/java/org/springframework/cloud/stream/binder/AbstractBinderTests.java
index f3f1405154..876f7bdee8 100644
--- a/core/spring-cloud-stream-test-support/src/main/java/org/springframework/cloud/stream/binder/AbstractBinderTests.java
+++ b/core/spring-cloud-stream-test-support/src/main/java/org/springframework/cloud/stream/binder/AbstractBinderTests.java
@@ -97,7 +97,7 @@ public void before() {
* {@link #timeoutMultiplier}).
*/
protected Message> receive(PollableChannel channel) {
- return receive(channel, 1);
+ return receive(channel, 10);
}
/**
diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamRetryTemplate.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamRetryTemplate.java
index 444aa01a22..008d019ce9 100644
--- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamRetryTemplate.java
+++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamRetryTemplate.java
@@ -24,7 +24,7 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
-import org.springframework.retry.support.RetryTemplate;
+import org.springframework.core.retry.RetryTemplate;
/**
* Marker to tag an instance of {@link RetryTemplate} to be used by the binder. This
diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java
index 2c5b81aea3..a0f2b1d3ed 100644
--- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java
+++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java
@@ -16,6 +16,9 @@
package org.springframework.cloud.stream.binder;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -31,10 +34,9 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.core.retry.RetryPolicy;
import org.springframework.expression.EvaluationContext;
-import org.springframework.retry.backoff.ExponentialBackOffPolicy;
-import org.springframework.retry.policy.SimpleRetryPolicy;
-import org.springframework.retry.support.RetryTemplate;
+import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@@ -104,7 +106,7 @@ public void setApplicationContext(ApplicationContext applicationContext)
Assert.isInstanceOf(GenericApplicationContext.class, applicationContext);
this.applicationContext = (GenericApplicationContext) applicationContext;
Map beansOfType = this.applicationContext.getBeansOfType(EvaluationContext.class);
- if (beansOfType.size() > 0) {
+ if (!beansOfType.isEmpty()) {
this.evaluationContext = beansOfType.values().iterator().next();
}
@@ -157,7 +159,7 @@ protected abstract Binding doBindProducer(String name, T outboundBindTarget,
P properties);
/**
- * Construct a name comprised of the name and group.
+ * Construct a name comprising the name and group.
* @param name the name.
* @param group the group.
* @return the constructed name.
@@ -192,19 +194,29 @@ protected RetryTemplate buildRetryTemplate(ConsumerProperties properties) {
RetryTemplate rt;
if (CollectionUtils.isEmpty(this.consumerBindingRetryTemplates)) {
rt = new RetryTemplate();
- SimpleRetryPolicy retryPolicy = CollectionUtils
- .isEmpty(properties.getRetryableExceptions())
- ? new SimpleRetryPolicy(properties.getMaxAttempts())
- : new SimpleRetryPolicy(properties.getMaxAttempts(),
- properties.getRetryableExceptions(), true,
- properties.isDefaultRetryable());
-
- ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
- backOffPolicy.setInitialInterval(properties.getBackOffInitialInterval());
- backOffPolicy.setMultiplier(properties.getBackOffMultiplier());
- backOffPolicy.setMaxInterval(properties.getBackOffMaxInterval());
+
+ Map, Boolean> retryableExceptionMapping = properties.getRetryableExceptions();
+ List> retryableExceptions = new ArrayList<>();
+ List> nonRetryableExceptions = new ArrayList<>();
+ for (var classBooleanEntry : retryableExceptionMapping.entrySet()) {
+ Class extends Throwable> exceptionClass = classBooleanEntry.getKey();
+ if (classBooleanEntry.getValue()) {
+ retryableExceptions.add(exceptionClass);
+ }
+ else {
+ nonRetryableExceptions.add(exceptionClass);
+ }
+ }
+ RetryPolicy retryPolicy =
+ RetryPolicy.builder()
+ .maxAttempts(properties.getMaxAttempts())
+ .delay(Duration.ofMillis(properties.getBackOffInitialInterval()))
+ .multiplier(properties.getBackOffMultiplier())
+ .maxDelay(Duration.ofMillis(properties.getBackOffMaxInterval()))
+ .includes(retryableExceptions)
+ .excludes(nonRetryableExceptions)
+ .build();
rt.setRetryPolicy(retryPolicy);
- rt.setBackOffPolicy(backOffPolicy);
}
else {
rt = StringUtils.hasText(properties.getRetryTemplateName())
diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java
index 05c1c402b7..03f0fff16f 100644
--- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java
+++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java
@@ -64,6 +64,7 @@
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
+import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
@@ -73,7 +74,7 @@
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ErrorMessage;
-import org.springframework.retry.RecoveryCallback;
+import org.springframework.integration.core.RecoveryCallback;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@@ -1061,7 +1062,7 @@ protected MessageHandler getDefaultErrorMessageHandler(
* @return the implementation - may be null.
*/
protected ErrorMessageStrategy getErrorMessageStrategy() {
- return null;
+ return new DefaultErrorMessageStrategy();
}
protected String getErrorRecovererName(ConsumerDestination destination, String group,
diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultPollableMessageSource.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultPollableMessageSource.java
index b6895bc080..f91bde9520 100644
--- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultPollableMessageSource.java
+++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultPollableMessageSource.java
@@ -31,6 +31,8 @@
import org.springframework.context.Lifecycle;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.core.retry.RetryException;
+import org.springframework.core.retry.RetryTemplate;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AckUtils;
import org.springframework.integration.acks.AcknowledgmentCallback;
@@ -50,12 +52,9 @@
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.retry.RecoveryCallback;
-import org.springframework.retry.RetryCallback;
-import org.springframework.retry.RetryContext;
-import org.springframework.retry.RetryListener;
-import org.springframework.retry.support.RetryTemplate;
+import org.springframework.integration.core.RecoveryCallback;
import org.springframework.util.Assert;
+import org.springframework.util.ReflectionUtils;
/**
* The default implementation of a {@link PollableMessageSource}.
@@ -67,17 +66,16 @@
* @since 2.0
*
*/
-public class DefaultPollableMessageSource
- implements PollableMessageSource, Lifecycle, RetryListener {
+public class DefaultPollableMessageSource implements PollableMessageSource, Lifecycle {
private static final Log log = LogFactory.getLog(DefaultPollableMessageSource.class);
- protected static final ThreadLocal attributesHolder = new ThreadLocal();
+ protected static final ThreadLocal ATTRIBUTES_HOLDER = new ThreadLocal<>();
- private static final DirectChannel dummyChannel = new DirectChannel();
+ private static final DirectChannel DUMMY_CHANNEL = new DirectChannel();
static {
- dummyChannel.setBeanName("dummy.required.by.nonnull.api");
+ DUMMY_CHANNEL.setBeanName("dummy.required.by.nonnull.api");
}
private final List interceptors = new ArrayList<>();
@@ -119,7 +117,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
Object result = invocation.proceed();
if (result instanceof Message> received) {
for (ChannelInterceptor interceptor : this.interceptors) {
- received = interceptor.preSend(received, dummyChannel);
+ received = interceptor.preSend(received, DUMMY_CHANNEL);
if (received == null) {
return null;
}
@@ -140,16 +138,15 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
}
public void setRetryTemplate(RetryTemplate retryTemplate) {
- retryTemplate.registerListener(this);
this.retryTemplate = retryTemplate;
}
public void setRecoveryCallback(RecoveryCallback recoveryCallback) {
- this.recoveryCallback = context -> {
- if (!shouldRequeue((MessagingException) context.getLastThrowable())) {
- return recoveryCallback.recover(context);
+ this.recoveryCallback = (context, cause) -> {
+ if (!shouldRequeue((MessagingException) cause)) {
+ return recoveryCallback.recover(context, cause);
}
- throw (MessagingException) context.getLastThrowable();
+ throw (MessagingException) cause;
};
}
@@ -208,30 +205,44 @@ public boolean poll(MessageHandler handler, ParameterizedTypeReference> type)
return false;
}
- AcknowledgmentCallback ackCallback = StaticMessageHeaderAccessor
- .getAcknowledgmentCallback(message);
+ AcknowledgmentCallback ackCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(message);
if (ackCallback == null) {
ackCallback = status -> log.warn("No AcknowledgementCallback defined. Status: " + status.name() + " " + message);
}
try {
+ setAttributesIfNecessary(message);
if (this.retryTemplate == null) {
- this.handle(message, handler);
+ handle(message, handler);
}
else {
- this.retryTemplate.execute(context -> {
- this.handle(message, handler);
- return null;
- }, this.recoveryCallback);
+ try {
+ this.retryTemplate.execute(() -> {
+ handle(message, handler);
+ return null;
+ });
+ }
+ catch (RetryException ex) {
+ if (this.recoveryCallback != null) {
+ AttributeAccessor attributeAccessor = ATTRIBUTES_HOLDER.get();
+ if (attributeAccessor == null) {
+ attributeAccessor = ErrorMessageUtils.getAttributeAccessor(message, null);
+ }
+ this.recoveryCallback.recover(attributeAccessor, ex.getCause());
+ }
+ else {
+ ReflectionUtils.rethrowRuntimeException(ex.getCause());
+ }
+ }
}
return true;
}
catch (MessagingException e) {
if (this.retryTemplate == null && !shouldRequeue(e)) {
try {
- this.messagingTemplate.send(this.errorChannel, this.errorMessageStrategy
- .buildErrorMessage(e, attributesHolder.get()));
+ this.messagingTemplate.send(this.errorChannel,
+ this.errorMessageStrategy.buildErrorMessage(e, ATTRIBUTES_HOLDER.get()));
}
catch (MessagingException e1) {
requeueOrNack(message, ackCallback, e1);
@@ -252,6 +263,7 @@ public boolean poll(MessageHandler handler, ParameterizedTypeReference> type)
throw new MessageHandlingException(message, e);
}
finally {
+ ATTRIBUTES_HOLDER.remove();
AckUtils.autoAck(ackCallback);
}
}
@@ -279,27 +291,6 @@ protected boolean shouldRequeue(Exception e) {
return requeue;
}
- @Override
- public boolean open(RetryContext context,
- RetryCallback callback) {
- if (DefaultPollableMessageSource.this.recoveryCallback != null) {
- attributesHolder.set(context);
- }
- return true;
- }
-
- @Override
- public void close(RetryContext context,
- RetryCallback callback, Throwable throwable) {
- attributesHolder.remove();
- }
-
- @Override
- public void onError(RetryContext context,
- RetryCallback callback, Throwable throwable) {
- // Empty
- }
-
/**
* Receives Message from the source and converts its payload to a provided type. Can
* return null
@@ -334,32 +325,28 @@ private void doHandleMessage(MessageHandler handler, Message> message) {
}
/**
- * If there's a retry template, it will set the attributes holder via the listener. If
- * there's no retry template, but there's an error channel, we create a new attributes
- * holder here. If an attributes holder exists (by either method), we set the
+ * If there's a retry template, it will set the attribute holder via the listener. If
+ * there's no retry template, but there's an error channel, we create a new attribute
+ * holder here. If an attribute holder exists (by either method), we set the
* attributes for use by the {@link ErrorMessageStrategy}.
* @param message the Spring Messaging message to use.
*/
private void setAttributesIfNecessary(Message> message) {
- boolean needHolder = this.errorChannel != null && this.retryTemplate == null;
- boolean needAttributes = needHolder || this.retryTemplate != null;
+ boolean needHolder = this.errorChannel != null || this.retryTemplate != null;
if (needHolder) {
- attributesHolder.set(ErrorMessageUtils.getAttributeAccessor(null, null));
- }
- if (needAttributes) {
- AttributeAccessor attributes = attributesHolder.get();
- if (attributes != null) {
- attributes.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY,
- message);
- if (this.attributesProvider != null) {
- this.attributesProvider.accept(attributes, message);
- }
+ AttributeAccessor attributes = ATTRIBUTES_HOLDER.get();
+ if (attributes == null) {
+ attributes = ErrorMessageUtils.getAttributeAccessor(null, null);
+ ATTRIBUTES_HOLDER.set(attributes);
+ }
+ attributes.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message);
+ if (this.attributesProvider != null) {
+ this.attributesProvider.accept(attributes, message);
}
}
}
private void handle(Message> message, MessageHandler handler) {
- setAttributesIfNecessary(message);
doHandleMessage(handler, message);
}
diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java
index 8283298afd..1278952745 100644
--- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java
+++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java
@@ -329,7 +329,7 @@ private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier> s
PollerProperties poller = producerProperties.getPoller();
PollerMetadata pm = new PollerMetadata();
- PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
+ PropertyMapper map = PropertyMapper.get();
map.from(poller::getMaxMessagesPerPoll).to(pm::setMaxMessagesPerPoll);
map.from(poller).as(this::asTrigger).to(pm::setTrigger);
pollerMetadata.set(pm);
diff --git a/pom.xml b/pom.xml
index f79c2ebdd2..827ff3ba3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
+ true
17
2.1
5.0.0-SNAPSHOT
@@ -227,17 +228,12 @@
spring-snapshots
Spring Snapshots
- https://repo.spring.io/libs-snapshot-local
+ https://repo.spring.io/snapshot
spring-milestones
Spring milestones
- https://repo.spring.io/libs-milestone-local
-
-
- spring-releases
- Spring Releases
- https://repo.spring.io/release
+ https://repo.spring.io/milestone
@@ -257,10 +253,5 @@
false
-
- spring-releases
- Spring Releases
- https://repo.spring.io/release
-