From 1341b46f7443ac93e5f54987b7536926bfd9f320 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Wed, 11 Feb 2026 23:41:45 +0100 Subject: [PATCH 1/2] Improve duplex network wait conditions and ports --- .../MessageDestinationVirtualTopicTest.java | 93 +++++++++++----- .../activemq/network/BaseNetworkTest.java | 101 ++++++++++++++++-- .../network/DynamicNetworkTestSupport.java | 5 +- ...IncludedDestinationsDuplexNetworkTest.java | 76 ++++++++----- ...eZeroPrefetchLazyDispatchPriorityTest.java | 15 ++- .../duplexDynamicIncludedDestLocalBroker.xml | 26 +---- 6 files changed, 223 insertions(+), 93 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java index cf0ae79cf3c..5ac3495e2d9 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java @@ -18,6 +18,9 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.util.Wait; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; @@ -27,9 +30,12 @@ import jakarta.annotation.Resource; import jakarta.jms.*; +import java.net.URI; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration({ "virtual-topic-network-test.xml" }) @@ -53,45 +59,64 @@ public class MessageDestinationVirtualTopicTest { private Session session1; - public void init() throws JMSException { + public void init() throws Exception { + // Get actual assigned ephemeral ports + final String broker1URL = broker1.getTransportConnectors().get(0).getConnectUri().toString(); + final String broker2URL = broker2.getTransportConnectors().get(0).getConnectUri().toString(); + LOG.info("Broker1 URL: {}", broker1URL); + LOG.info("Broker2 URL: {}", broker2URL); + + // Add network connector from broker2 to broker1 programmatically using actual port + final DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector( + new URI("static://(" + broker1URL + ")")); + nc.setName("linkToBrokerB1"); + nc.setNetworkTTL(1); + nc.setDuplex(true); + broker2.addNetworkConnector(nc); + nc.start(); + + // Wait for bridge to be established + assertTrue("Network bridge should be established", + Wait.waitFor(() -> nc.activeBridges().size() == 1, 10_000, 500)); + // Create connection on Broker B2 - ConnectionFactory broker2ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:62616"); - Connection connection2 = broker2ConnectionFactory.createConnection(); + final ConnectionFactory broker2ConnectionFactory = new ActiveMQConnectionFactory(broker2URL); + final Connection connection2 = broker2ConnectionFactory.createConnection(); connection2.start(); - Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue consumerDQueue = session2.createQueue("Consumer.D.VirtualTopic.T1"); + final Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue consumerDQueue = session2.createQueue("Consumer.D.VirtualTopic.T1"); // Bind listener on queue for consumer D - MessageConsumer consumer = session2.createConsumer(consumerDQueue); + final MessageConsumer consumer = session2.createConsumer(consumerDQueue); listener2 = new SimpleMessageListener(); consumer.setMessageListener(listener2); // Create connection on Broker B1 - ConnectionFactory broker1ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); - Connection connection1 = broker1ConnectionFactory.createConnection(); + final ConnectionFactory broker1ConnectionFactory = new ActiveMQConnectionFactory(broker1URL); + final Connection connection1 = broker1ConnectionFactory.createConnection(); connection1.start(); session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue consumerCQueue = session1.createQueue("Consumer.C.VirtualTopic.T1"); + final Queue consumerCQueue = session1.createQueue("Consumer.C.VirtualTopic.T1"); - // Bind listener on queue for consumer D - MessageConsumer consumer1 = session1.createConsumer(consumerCQueue); + // Bind listener on queue for consumer C + final MessageConsumer consumer1 = session1.createConsumer(consumerCQueue); listener1 = new SimpleMessageListener(); consumer1.setMessageListener(listener1); - // Create listener on Broker B1 for VT T2 witout setOriginalDest - Queue consumer3Queue = session1.createQueue("Consumer.A.VirtualTopic.T2"); + // Create listener on Broker B1 for VT T2 without setOriginalDest + final Queue consumer3Queue = session1.createQueue("Consumer.A.VirtualTopic.T2"); - // Bind listener on queue for consumer D - MessageConsumer consumerD = session1.createConsumer(consumer3Queue); + // Bind listener on queue for consumer A + final MessageConsumer consumerD = session1.createConsumer(consumer3Queue); listener3 = new SimpleMessageListener(); consumerD.setMessageListener(listener3); // Create producer for topic, on B1 - Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1,VirtualTopic.T2"); + final Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1,VirtualTopic.T2"); producer = session1.createProducer(virtualTopicT1); } - @Test + @Test(timeout = 30_000) public void testDestinationNames() throws Exception { LOG.info("Started waiting for broker 1 and 2"); @@ -102,27 +127,41 @@ public void testDestinationNames() throws Exception { init(); // Create a monitor - CountDownLatch monitor = new CountDownLatch(3); + final CountDownLatch monitor = new CountDownLatch(3); listener1.setCountDown(monitor); listener2.setCountDown(monitor); listener3.setCountDown(monitor); + // Wait for the consumer on broker2 to be visible on broker1 via the network bridge. + // The virtual topic Consumer.D.VirtualTopic.T1 on broker2 must be forwarded to broker1 + // before sending, otherwise the message won't reach listener2. + assertTrue("Consumer.D queue should exist on broker1 via network bridge", + Wait.waitFor(() -> { + try { + final org.apache.activemq.broker.region.Destination dest = + broker1.getDestination(new ActiveMQQueue("Consumer.D.VirtualTopic.T1")); + return dest != null && dest.getConsumers().size() >= 1; + } catch (final Exception e) { + return false; + } + }, 10_000, 200)); + LOG.info("Sending message"); // Send a message on the topic - TextMessage message = session1.createTextMessage("Hello World !"); + final TextMessage message = session1.createTextMessage("Hello World !"); producer.send(message); LOG.info("Waiting for message reception"); // Wait the two messages in the related queues - monitor.await(); + assertTrue("All 3 listeners should receive messages", monitor.await(15, TimeUnit.SECONDS)); // Get the message destinations - String lastJMSDestination2 = listener2.getLastJMSDestination(); - System.err.println(lastJMSDestination2); - String lastJMSDestination1 = listener1.getLastJMSDestination(); - System.err.println(lastJMSDestination1); + final String lastJMSDestination2 = listener2.getLastJMSDestination(); + LOG.info("Listener2 destination: {}", lastJMSDestination2); + final String lastJMSDestination1 = listener1.getLastJMSDestination(); + LOG.info("Listener1 destination: {}", lastJMSDestination1); - String lastJMSDestination3 = listener3.getLastJMSDestination(); - System.err.println(lastJMSDestination3); + final String lastJMSDestination3 = listener3.getLastJMSDestination(); + LOG.info("Listener3 destination: {}", lastJMSDestination3); // The destination names assertEquals("queue://Consumer.D.VirtualTopic.T1", lastJMSDestination2); @@ -130,4 +169,4 @@ public void testDestinationNames() throws Exception { assertEquals("topic://VirtualTopic.T2", lastJMSDestination3); } -} \ No newline at end of file +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java index 1af6636e928..1faac2e2064 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java @@ -16,13 +16,22 @@ */ package org.apache.activemq.network; +import static org.junit.Assert.assertTrue; + import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; import jakarta.jms.Connection; import jakarta.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; import org.apache.activemq.xbean.BrokerFactoryBean; import org.junit.After; import org.junit.Before; @@ -59,11 +68,15 @@ protected void doTearDown() throws Exception { if(remoteConnection != null) remoteConnection.close(); - if(localBroker != null) + if(localBroker != null) { localBroker.stop(); + localBroker.waitUntilStopped(); + } - if(remoteBroker != null) + if(remoteBroker != null) { remoteBroker.stop(); + remoteBroker.waitUntilStopped(); + } } protected void doSetUp(boolean deleteAllMessages) throws Exception { @@ -75,14 +88,29 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception { localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); localBroker.start(); localBroker.waitUntilStarted(); - URI localURI = localBroker.getVmConnectorURI(); + + // Programmatically add network connectors using the actual assigned ephemeral ports. + // Use startNetworkConnector() instead of connector.start() to ensure proper JMX MBean registration. + addNetworkConnectors(); + + // Wait for network bridges to be FULLY started (advisory consumers registered). + // activeBridges().isEmpty() is NOT sufficient because bridges are added to the map + // before start() completes asynchronously. We must wait for the startedLatch. + waitForBridgeFullyStarted(localBroker, "Local"); + // Only wait for remote bridge if the remote broker has its own network connector + // (duplex bridges don't add a separate connector on the remote side) + if (!remoteBroker.getNetworkConnectors().isEmpty()) { + waitForBridgeFullyStarted(remoteBroker, "Remote"); + } + + final URI localURI = localBroker.getVmConnectorURI(); ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); fac.setAlwaysSyncSend(true); fac.setDispatchAsync(false); localConnection = fac.createConnection(); localConnection.setClientID("clientId"); localConnection.start(); - URI remoteURI = remoteBroker.getVmConnectorURI(); + final URI remoteURI = remoteBroker.getVmConnectorURI(); fac = new ActiveMQConnectionFactory(remoteURI); remoteConnection = fac.createConnection(); remoteConnection.setClientID("clientId"); @@ -91,21 +119,72 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception { remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); } + /** + * Programmatically adds network connectors between the local and remote brokers + * using the actual assigned ephemeral ports. This avoids hardcoding ports in XML + * config files which causes port conflicts on CI. + */ + protected void addNetworkConnectors() throws Exception { + final URI remoteConnectURI = remoteBroker.getTransportConnectors().get(0).getConnectUri(); + final URI localConnectURI = localBroker.getTransportConnectors().get(0).getConnectUri(); + + // Local -> Remote network connector (matches the original localBroker.xml config) + final DiscoveryNetworkConnector localToRemote = new DiscoveryNetworkConnector( + new URI("static:(" + remoteConnectURI + ")")); + localToRemote.setName("networkConnector"); + localToRemote.setDynamicOnly(false); + localToRemote.setConduitSubscriptions(true); + localToRemote.setDecreaseNetworkConsumerPriority(false); + + final List dynamicallyIncluded = new ArrayList<>(); + dynamicallyIncluded.add(new ActiveMQQueue("include.test.foo")); + dynamicallyIncluded.add(new ActiveMQTopic("include.test.bar")); + localToRemote.setDynamicallyIncludedDestinations(dynamicallyIncluded); + + final List excluded = new ArrayList<>(); + excluded.add(new ActiveMQQueue("exclude.test.foo")); + excluded.add(new ActiveMQTopic("exclude.test.bar")); + localToRemote.setExcludedDestinations(excluded); + + localBroker.addNetworkConnector(localToRemote); + // startNetworkConnector handles JMX MBean registration and connector startup + localBroker.startNetworkConnector(localToRemote, null); + + // Remote -> Local network connector (matches the original remoteBroker.xml config) + final DiscoveryNetworkConnector remoteToLocal = new DiscoveryNetworkConnector( + new URI("static:(" + localConnectURI + ")")); + remoteToLocal.setName("networkConnector"); + remoteBroker.addNetworkConnector(remoteToLocal); + remoteBroker.startNetworkConnector(remoteToLocal, null); + } + + protected void waitForBridgeFullyStarted(final BrokerService broker, final String label) throws Exception { + assertTrue(label + " broker bridge should be fully started", Wait.waitFor(() -> { + if (broker.getNetworkConnectors().isEmpty() + || broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { + return false; + } + final NetworkBridge bridge = broker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + if (bridge instanceof DemandForwardingBridgeSupport) { + return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0; + } + return true; + }, TimeUnit.SECONDS.toMillis(10), 100)); + } + protected String getRemoteBrokerURI() { - return "org/apache/activemq/network/remoteBroker.xml"; + return "org/apache/activemq/network/remoteBroker-ephemeral.xml"; } protected String getLocalBrokerURI() { - return "org/apache/activemq/network/localBroker.xml"; + return "org/apache/activemq/network/localBroker-ephemeral.xml"; } protected BrokerService createBroker(String uri) throws Exception { - Resource resource = new ClassPathResource(uri); - BrokerFactoryBean factory = new BrokerFactoryBean(resource); - resource = new ClassPathResource(uri); - factory = new BrokerFactoryBean(resource); + final Resource resource = new ClassPathResource(uri); + final BrokerFactoryBean factory = new BrokerFactoryBean(resource); factory.afterPropertiesSet(); - BrokerService result = factory.getBroker(); + final BrokerService result = factory.getBroker(); return result; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java index 87ee09040e8..7dcb2530e68 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java @@ -226,8 +226,9 @@ protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, final in protected DemandForwardingBridge findDuplexBridge(final TransportConnector connector) throws Exception { assertNotNull(connector); - for (TransportConnection tc : connector.getConnections()) { - if (tc.getConnectionId().startsWith("networkConnector_")) { + for (final TransportConnection tc : connector.getConnections()) { + final String connectionId = tc.getConnectionId(); + if (connectionId != null && connectionId.startsWith("networkConnector_")) { final Field bridgeField = TransportConnection.class.getDeclaredField("duplexBridge"); bridgeField.setAccessible(true); return (DemandForwardingBridge) bridgeField.get(tc); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java index 39018a18fc0..ed60cc60767 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java @@ -22,7 +22,9 @@ import static org.junit.Assert.assertTrue; import java.lang.reflect.Field; -import java.util.concurrent.CopyOnWriteArrayList; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; import jakarta.jms.MessageProducer; import jakarta.jms.TemporaryQueue; @@ -31,6 +33,9 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnection; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.Wait; import org.junit.Test; @@ -39,8 +44,6 @@ */ public class DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetworkTest { - private static final int REMOTE_BROKER_TCP_PORT = 61617; - @Override protected String getLocalBrokerURI() { return "org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml"; @@ -48,12 +51,39 @@ protected String getLocalBrokerURI() { @Override protected BrokerService createRemoteBroker() throws Exception { - BrokerService broker = new BrokerService(); + final BrokerService broker = new BrokerService(); broker.setBrokerName("remoteBroker"); - broker.addConnector("tcp://localhost:" + REMOTE_BROKER_TCP_PORT); + broker.addConnector("tcp://localhost:0"); return broker; } + @Override + protected void addNetworkConnectors() throws Exception { + // For duplex test: only one connector from local to remote, with duplex=true + final URI remoteConnectURI = remoteBroker.getTransportConnectors().get(0).getConnectUri(); + + final DiscoveryNetworkConnector localToRemote = new DiscoveryNetworkConnector( + new URI("static:(" + remoteConnectURI + ")")); + localToRemote.setName("networkConnector"); + localToRemote.setDuplex(true); + localToRemote.setDynamicOnly(false); + localToRemote.setConduitSubscriptions(true); + localToRemote.setDecreaseNetworkConsumerPriority(false); + + final List dynamicallyIncluded = new ArrayList<>(); + dynamicallyIncluded.add(new ActiveMQQueue("include.test.foo")); + dynamicallyIncluded.add(new ActiveMQTopic("include.test.bar")); + localToRemote.setDynamicallyIncludedDestinations(dynamicallyIncluded); + + final List excluded = new ArrayList<>(); + excluded.add(new ActiveMQQueue("exclude.test.foo")); + excluded.add(new ActiveMQTopic("exclude.test.bar")); + localToRemote.setExcludedDestinations(excluded); + + localBroker.addNetworkConnector(localToRemote); + localBroker.startNetworkConnector(localToRemote, null); + } + // we have to override this, because with dynamicallyIncludedDestinations working properly // (see https://issues.apache.org/jira/browse/AMQ-4209) you can't get request/response // with temps working (there is no wild card like there is for staticallyIncludedDest) @@ -65,14 +95,14 @@ public void testRequestReply() throws Exception { @Test public void testTempQueues() throws Exception { - TemporaryQueue temp = localSession.createTemporaryQueue(); - MessageProducer producer = localSession.createProducer(temp); + final TemporaryQueue temp = localSession.createTemporaryQueue(); + final MessageProducer producer = localSession.createProducer(temp); producer.send(localSession.createTextMessage("test")); - Thread.sleep(100); - assertEquals("Destination not created", 1, remoteBroker.getAdminView().getTemporaryQueues().length); + assertTrue("Destination created on remote", + Wait.waitFor(() -> remoteBroker.getAdminView().getTemporaryQueues().length == 1, 5000, 100)); temp.delete(); - Thread.sleep(100); - assertEquals("Destination not deleted", 0, remoteBroker.getAdminView().getTemporaryQueues().length); + assertTrue("Destination deleted on remote", + Wait.waitFor(() -> remoteBroker.getAdminView().getTemporaryQueues().length == 0, 5000, 100)); } @Test @@ -96,18 +126,16 @@ public void testDynamicallyIncludedDestinationsForDuplex() throws Exception{ configuration.getDestinationFilter()); } - private NetworkBridgeConfiguration getConfigurationFromNetworkBridge(DemandForwardingBridgeSupport duplexBridge) throws NoSuchFieldException, IllegalAccessException { - Field f = DemandForwardingBridgeSupport.class.getDeclaredField("configuration"); + private NetworkBridgeConfiguration getConfigurationFromNetworkBridge(final DemandForwardingBridgeSupport duplexBridge) throws NoSuchFieldException, IllegalAccessException { + final Field f = DemandForwardingBridgeSupport.class.getDeclaredField("configuration"); f.setAccessible(true); - NetworkBridgeConfiguration configuration = (NetworkBridgeConfiguration) f.get(duplexBridge); - return configuration; + return (NetworkBridgeConfiguration) f.get(duplexBridge); } - private DemandForwardingBridge getDuplexBridgeFromConnection(TransportConnection bridgeConnection) throws NoSuchFieldException, IllegalAccessException { - Field f = TransportConnection.class.getDeclaredField("duplexBridge"); + private DemandForwardingBridge getDuplexBridgeFromConnection(final TransportConnection bridgeConnection) throws NoSuchFieldException, IllegalAccessException { + final Field f = TransportConnection.class.getDeclaredField("duplexBridge"); f.setAccessible(true); - DemandForwardingBridge bridge = (DemandForwardingBridge) f.get(bridgeConnection); - return bridge; + return (DemandForwardingBridge) f.get(bridgeConnection); } public TransportConnection getDuplexBridgeConnectionFromRemote() throws Exception { @@ -128,13 +156,9 @@ protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() && - expectedRemoteSent == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount(); - } - })); + assertTrue(Wait.waitFor(() -> + expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() && + expectedRemoteSent == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount())); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java index 880b394153e..153794f8156 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java @@ -232,11 +232,16 @@ public void testJmsBrowserGetsPagedIn() throws Exception { assertEquals("see only the paged in for pull", 1, browsed.size()); - // Wait for all messages to be available (including redelivery of unacked message) - assertTrue("All messages available for consumption", Wait.waitFor(() -> { + // Wait for the unacked message to be redelivered after connection close. + // With zero prefetch + lazy dispatch, the broker needs time to process the + // connection close, redeliver the unacked message, and make it available again. + // Check that inflight count drops to 0, meaning the message has been returned. + assertTrue("Unacked message redelivered after connection close", Wait.waitFor(() -> { final Queue queue = (Queue) broker.getDestination(destination); - return queue != null && queue.getDestinationStatistics().getMessages().getCount() == numToSend; - }, 5000, 100)); + return queue != null + && queue.getDestinationStatistics().getMessages().getCount() == numToSend + && queue.getDestinationStatistics().getInflight().getCount() == 0; + }, TimeUnit.SECONDS.toMillis(20), 100)); // consume messages ArrayList consumeList = consumeMessages("TestQ"); @@ -280,7 +285,7 @@ private ArrayList consumeMessages(String queueName) throws Exception { boolean finished = false; while (!finished) { - Message message = consumer.receive(returnedMessages.isEmpty() ? 5000 : 1000); + Message message = consumer.receive(returnedMessages.isEmpty() ? TimeUnit.SECONDS.toMillis(30) : 1000); if (message == null) { finished = true; } diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml index 70f6da82152..4346bcfae4c 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml @@ -6,9 +6,9 @@ The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -25,28 +25,10 @@ - - - - - - - - - - - - - - - + - + From 127a88d384704b73b3a85b9ff9f68d3a60ca8547 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Thu, 12 Feb 2026 00:02:54 +0100 Subject: [PATCH 2/2] Add CopyOnWriteArrayList import to support thread-safe list operations --- .../DynamicallyIncludedDestinationsDuplexNetworkTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java index ed60cc60767..4470fa80177 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java @@ -25,6 +25,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import jakarta.jms.MessageProducer; import jakarta.jms.TemporaryQueue;