From add9f610e9c0e3bde9a350e188d703f5f9eba77d Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Wed, 11 Feb 2026 23:38:14 +0100 Subject: [PATCH] Relax timeouts and macOS exclusions + small fixes --- .../ConnectionFailureEvictsFromPoolTest.java | 2 +- activemq-unit-tests/pom.xml | 24 +- .../activemq/bugs/DuplicateFromStoreTest.java | 6 +- .../network/DurableSyncNetworkBridgeTest.java | 86 +++--- .../network/DynamicNetworkTestSupport.java | 36 ++- ...IncludedDestinationsDuplexNetworkTest.java | 27 +- .../MQTTNetworkOfBrokersFailoverTest.java | 37 +-- .../activemq/network/SimpleNetworkTest.java | 291 +++++++++--------- ...rictedThreadPoolInactivityTimeoutTest.java | 2 +- ...eZeroPrefetchLazyDispatchPriorityTest.java | 15 +- 10 files changed, 292 insertions(+), 234 deletions(-) diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java index 12c46be4218..ef030ef2670 100644 --- a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java +++ b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java @@ -117,7 +117,7 @@ public void transportResumed() { } catch (JMSException e) { } } - TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(15, TimeUnit.SECONDS)); + TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(30, TimeUnit.SECONDS)); // If we get another connection now it should be a new connection that // works. LOG.info("expect new connection after failure"); diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index 5355b12e6ae..aea772ae946 100644 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -979,7 +979,6 @@ org/apache/activemq/transport/SoWriteTimeoutTest.* org/apache/activemq/transport/TopicClusterTest.* org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.* - org/apache/activemq/transport/discovery/DiscoveryTransportBrokerTest.* org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.* org/apache/activemq/transport/discovery/DiscoveryUriTest.* org/apache/activemq/transport/discovery/MasterSlaveDiscoveryTest.* @@ -1170,6 +1169,29 @@ + + activemq.tests.mac.excludes + + + mac + + + + + + maven-surefire-plugin + + + + org/apache/activemq/transport/peer/PeerTransportTest.* + org/apache/activemq/transport/multicast/MulticastTransportTest.* + org/apache/activemq/network/MulticastNetworkTest.* + + + + + + activemq.tests.aix.excludes diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java index 753e41aa2c5..04f633893df 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java @@ -123,7 +123,7 @@ public void stopBroker() throws Exception { } } - @Test + @Test(timeout = 120_000) public void testDuplicateMessage() throws Exception { LOG.info("Testing for duplicate messages."); @@ -134,10 +134,10 @@ public void testDuplicateMessage() throws Exception { createOpenwireClients(producers, consumers); LOG.info("All producers and consumers got started. Awaiting their termination"); - producersFinished.await(100, TimeUnit.MINUTES); + producersFinished.await(2, TimeUnit.MINUTES); LOG.info("All producers have terminated. remaining to send: " + totalMessagesToSend.get() + ", sent:" + totalMessagesSent.get()); - consumersFinished.await(100, TimeUnit.MINUTES); + consumersFinished.await(2, TimeUnit.MINUTES); LOG.info("All consumers have terminated."); producers.shutdownNow(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index 9727dbc41a7..1791ec242f5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -49,7 +49,6 @@ import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; import org.apache.activemq.util.Wait; -import org.apache.activemq.util.Wait.Condition; import org.junit.After; import org.junit.Assume; import org.junit.Before; @@ -83,7 +82,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { private final FLOW flow; @Rule - public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS); + public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS); @Parameters public static Collection data() { @@ -531,7 +530,6 @@ public void testAddOnlineSubscriptionsWithBridgeOffline() throws Exception { session1.createDurableSubscriber(topic, "sub3"); session1.createDurableSubscriber(excludeTopic, "sub-exclude"); - Thread.sleep(1000); assertNCDurableSubsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, excludeTopic, 0); @@ -570,13 +568,10 @@ public void testAddOnlineSubscriptionsTwoBridges() throws Exception { secondConnector.start(); //Make sure both bridges are connected - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 && - localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1; - } - }, 10000, 500)); + assertTrue(Wait.waitFor(() -> + localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 && + localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1, + TimeUnit.SECONDS.toMillis(10), 500)); //Make sure NC durables exist for both bridges assertNCDurableSubsCount(broker2, topic2, 1); @@ -637,13 +632,7 @@ public void testVirtualDestSubForceDurableSync() throws Exception { final DestinationStatistics remoteDestStatistics2 = remoteBroker.getDestination( new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); - assertTrue(Wait.waitFor(new Condition() { - - @Override - public boolean isSatisified() throws Exception { - return remoteDestStatistics2.getMessages().getCount() == 501; - } - })); + assertTrue(Wait.waitFor(() -> remoteDestStatistics2.getMessages().getCount() == 501)); } @@ -723,8 +712,36 @@ protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, included = new ActiveMQTopic(testTopicName); doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0); doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir); - //Give time for advisories to propagate - Thread.sleep(1000); + //Wait for the bridge to be fully started (advisory consumers registered). + //Note: activeBridges().size() == 1 is NOT sufficient because bridges are added + //to the map before start() completes asynchronously. We must wait for the + //startedLatch which counts down after advisory consumers are registered. + if (startNetworkConnector) { + waitForBridgeFullyStarted(); + } + } + + private void waitForBridgeFullyStarted() throws Exception { + // Wait for the local bridge to be fully started (advisory consumers registered) + assertTrue("Local bridge should be fully started", Wait.waitFor(() -> { + if (localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { + return false; + } + final NetworkBridge bridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + if (bridge instanceof DemandForwardingBridgeSupport) { + return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0; + } + return true; + }, TimeUnit.SECONDS.toMillis(10), 100)); + + // Also wait for the duplex bridge on the remote broker to be fully started. + // The duplex connector creates a separate DemandForwardingBridge on the remote side + // that also needs its advisory consumers registered before it can process events. + assertTrue("Duplex bridge should be fully started", Wait.waitFor(() -> { + final DemandForwardingBridge duplexBridge = findDuplexBridge( + remoteBroker.getTransportConnectors().get(0)); + return duplexBridge != null && duplexBridge.startedLatch.getCount() == 0; + }, TimeUnit.SECONDS.toMillis(10), 100)); } protected void restartLocalBroker(boolean startNetworkConnector) throws Exception { @@ -757,12 +774,12 @@ protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetwor localConnection.start(); if (startNetworkConnector) { - Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1; - } - }, 5000, 500); + // Best-effort wait for the bridge to appear. Do NOT use assertTrue here + // because some tests restart localBroker before remoteBroker is running, + // relying on the bridge connecting later when remoteBroker restarts. + // Tests that need the bridge to be fully started call assertBridgeStarted() explicitly. + Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, + TimeUnit.SECONDS.toMillis(10), 500); } localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -881,19 +898,16 @@ protected BrokerService createRemoteBroker(File dataDir, int port) throws Except protected void waitForSubscriptionInactive(final BrokerService brokerService, final ActiveMQTopic topic, final String subName) throws Exception { - assertTrue("Subscription should become inactive", Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - List subs = getSubscriptions(brokerService, topic); - for (org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) { - if (sub.getSubscriptionKey().getSubscriptionName().equals(subName)) { - return !sub.isActive(); - } + assertTrue("Subscription should become inactive", Wait.waitFor(() -> { + final List subs = getSubscriptions(brokerService, topic); + for (final org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) { + if (sub.getSubscriptionKey().getSubscriptionName().equals(subName)) { + return !sub.isActive(); } - // If subscription doesn't exist, it's considered inactive - return true; } - }, 5000, 100)); + // If subscription doesn't exist, it's considered inactive + return true; + }, TimeUnit.SECONDS.toMillis(10), 100)); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java index 87ee09040e8..a347ffe3b5b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java @@ -24,6 +24,7 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import jakarta.jms.Connection; import jakarta.jms.JMSException; @@ -45,7 +46,6 @@ import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.Wait; -import org.apache.activemq.util.Wait.Condition; import org.junit.Rule; import org.junit.rules.TemporaryFolder; @@ -95,9 +95,16 @@ protected void stopRemoteBroker() throws Exception { } protected void assertBridgeStarted() throws Exception { - assertTrue(Wait.waitFor( - () -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, - 10000, 500)); + assertTrue("Bridge should be fully started", Wait.waitFor(() -> { + if (localBroker.getNetworkConnectors().get(0).activeBridges().size() != 1) { + return false; + } + final NetworkBridge bridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + if (bridge instanceof DemandForwardingBridgeSupport) { + return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0; + } + return true; + }, TimeUnit.SECONDS.toMillis(10), 500)); } protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionContext context, @@ -135,17 +142,19 @@ protected interface ConsumerCreator { protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest, final int count) throws Exception { - assertTrue(Wait.waitFor(() -> count == getNCDurableSubs(brokerService, dest).size(), - 10000, 500)); + assertTrue("Expected " + count + " NC durable subs on " + dest, + Wait.waitFor(() -> count == getNCDurableSubs(brokerService, dest).size(), + TimeUnit.SECONDS.toMillis(30), 500)); } protected void assertConsumersCount(final BrokerService brokerService, final ActiveMQDestination dest, final int count) throws Exception { assertTrue(Wait.waitFor(() -> count == getConsumers(brokerService, dest).size(), 10000, 500)); - Thread.sleep(1000); - // Check one more time after a short pause to make sure the count didn't increase past what we wanted - assertEquals(count, getConsumers(brokerService, dest).size()); + // Wait a bit longer and verify the count is stable (didn't increase past what we wanted) + assertTrue("Consumer count should remain stable at " + count, + Wait.waitFor(() -> count == getConsumers(brokerService, dest).size(), + TimeUnit.SECONDS.toMillis(5), 500)); } protected List getConsumers(final BrokerService brokerService, @@ -208,12 +217,9 @@ protected void removeSubscription(final BrokerService brokerService, protected void assertSubscriptionsCount(final BrokerService brokerService, final ActiveMQTopic dest, final int count) throws Exception { - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return count == getSubscriptions(brokerService, dest).size(); - } - }, 10000, 500)); + assertTrue("Expected " + count + " subscriptions on " + dest, + Wait.waitFor(() -> count == getSubscriptions(brokerService, dest).size(), + TimeUnit.SECONDS.toMillis(30), 500)); } protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, final int count) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java index 39018a18fc0..f5bc8270ae3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java @@ -75,7 +75,7 @@ public void testTempQueues() throws Exception { assertEquals("Destination not deleted", 0, remoteBroker.getAdminView().getTemporaryQueues().length); } - @Test + @Test(timeout = 60 * 1000) public void testDynamicallyIncludedDestinationsForDuplex() throws Exception{ // Once the bridge is set up, we should see the filter used for the duplex end of the bridge // only subscribe to the specific destinations included in the list @@ -83,11 +83,11 @@ public void testDynamicallyIncludedDestinationsForDuplex() throws Exception{ // is correct // the bridge on the remote broker has the correct filter - TransportConnection bridgeConnection = getDuplexBridgeConnectionFromRemote(); + final TransportConnection bridgeConnection = getDuplexBridgeConnectionFromRemote(); assertNotNull(bridgeConnection); - DemandForwardingBridge duplexBridge = getDuplexBridgeFromConnection(bridgeConnection); + final DemandForwardingBridge duplexBridge = waitForDuplexBridge(bridgeConnection); assertNotNull(duplexBridge); - NetworkBridgeConfiguration configuration = getConfigurationFromNetworkBridge(duplexBridge); + final NetworkBridgeConfiguration configuration = getConfigurationFromNetworkBridge(duplexBridge); assertNotNull(configuration); assertFalse("This destinationFilter does not include ONLY the destinations specified in dynamicallyIncludedDestinations", configuration.getDestinationFilter().equals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">")); @@ -110,17 +110,20 @@ private DemandForwardingBridge getDuplexBridgeFromConnection(TransportConnection return bridge; } + private DemandForwardingBridge waitForDuplexBridge(final TransportConnection bridgeConnection) throws Exception { + assertTrue("Timed out waiting for duplex bridge to be fully started", + Wait.waitFor(() -> { + final DemandForwardingBridge bridge = getDuplexBridgeFromConnection(bridgeConnection); + return bridge != null && bridge.getRemoteBrokerName() != null; + })); + return getDuplexBridgeFromConnection(bridgeConnection); + } + public TransportConnection getDuplexBridgeConnectionFromRemote() throws Exception { final TransportConnector transportConnector = remoteBroker.getTransportConnectorByScheme("tcp"); assertTrue("Timed out waiting for duplex bridge connection", - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() { - return !transportConnector.getConnections().isEmpty(); - } - })); - CopyOnWriteArrayList transportConnections = transportConnector.getConnections(); - return transportConnections.get(0); + Wait.waitFor(() -> !transportConnector.getConnections().isEmpty())); + return transportConnector.getConnections().get(0); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java index ad52fecf702..bc6c985e247 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java @@ -23,9 +23,7 @@ import jakarta.jms.Connection; import jakarta.jms.Destination; import jakarta.jms.JMSException; -import jakarta.jms.Message; import jakarta.jms.MessageConsumer; -import jakarta.jms.MessageListener; import jakarta.jms.Session; import javax.management.ObjectName; @@ -35,6 +33,7 @@ import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.Wait; import org.apache.commons.lang.ArrayUtils; import org.fusesource.hawtdispatch.Dispatch; import org.fusesource.mqtt.client.BlockingConnection; @@ -69,6 +68,10 @@ protected void setUp() throws Exception { remoteBroker.addNetworkConnector(nc); nc.start(); + // Wait for the network bridge to be established before proceeding + assertTrue("Network bridge should be established", + Wait.waitFor(() -> nc.activeBridges().size() == 1, TimeUnit.SECONDS.toMillis(10), 500)); + // mqtt port should have been assigned by now assertFalse(localBrokerMQTTPort == -1); assertFalse(remoteBrokerMQTTPort == -1); @@ -104,7 +107,7 @@ public void testNoStaleSubscriptionAcrossNetwork() throws Exception { remoteConn.connect(); remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}); - assertTrue("No destination detected!", consumerNetworked.await(1, TimeUnit.SECONDS)); + assertTrue("No destination detected!", consumerNetworked.await(5, TimeUnit.SECONDS)); assertQueueExistsOn(remoteBroker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar"); assertQueueExistsOn(broker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar"); remoteConn.disconnect(); @@ -149,23 +152,17 @@ private CountDownLatch listenForConsumersOn(BrokerService broker) throws Excepti final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar"); MessageConsumer consumer = session.createConsumer(dest); - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - latch.countDown(); - // shutdown this connection - Dispatch.getGlobalQueue().execute(new Runnable() { - @Override - public void run() { - try { - session.close(); - connection.close(); - } catch (JMSException e) { - e.printStackTrace(); - } - } - }); - } + consumer.setMessageListener(message -> { + latch.countDown(); + // shutdown this connection + Dispatch.getGlobalQueue().execute(() -> { + try { + session.close(); + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + }); }); return latch; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java index 6ccf0137b69..08ec477e8db 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java @@ -16,20 +16,15 @@ */ package org.apache.activemq.network; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.util.Arrays; -import java.util.concurrent.ConcurrentMap; - import jakarta.jms.DeliveryMode; import jakarta.jms.Destination; import jakarta.jms.JMSException; import jakarta.jms.Message; import jakarta.jms.MessageConsumer; -import jakarta.jms.MessageListener; import jakarta.jms.MessageProducer; import jakarta.jms.TextMessage; import jakarta.jms.TopicRequestor; @@ -41,13 +36,13 @@ import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.ConsumerId; import org.apache.activemq.util.Wait; -import org.apache.activemq.util.Wait.Condition; import org.junit.Ignore; import org.junit.Test; import org.springframework.context.support.AbstractApplicationContext; +import java.util.concurrent.TimeUnit; + public class SimpleNetworkTest extends BaseNetworkTest { protected static final int MESSAGE_COUNT = 10; @@ -70,21 +65,21 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception { @Test(timeout = 60 * 1000) public void testMessageCompression() throws Exception { - ActiveMQConnection localAmqConnection = (ActiveMQConnection) localConnection; + final ActiveMQConnection localAmqConnection = (ActiveMQConnection) localConnection; localAmqConnection.setUseCompression(true); - MessageConsumer consumer1 = remoteSession.createConsumer(included); - MessageProducer producer = localSession.createProducer(included); + final MessageConsumer consumer1 = remoteSession.createConsumer(included); + final MessageProducer producer = localSession.createProducer(included); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); waitForConsumerRegistration(localBroker, 1, included); for (int i = 0; i < MESSAGE_COUNT; i++) { - Message test = localSession.createTextMessage("test-" + i); + final Message test = localSession.createTextMessage("test-" + i); producer.send(test); - Message msg = consumer1.receive(3000); + final Message msg = consumer1.receive(3000); assertNotNull("not null? message: " + i, msg); - ActiveMQMessage amqMessage = (ActiveMQMessage) msg; + final ActiveMQMessage amqMessage = (ActiveMQMessage) msg; assertTrue(amqMessage.isCompressed()); } // ensure no more messages received @@ -96,30 +91,27 @@ public void testMessageCompression() throws Exception { @Test(timeout = 60 * 1000) public void testRequestReply() throws Exception { final MessageProducer remoteProducer = remoteSession.createProducer(null); - MessageConsumer remoteConsumer = remoteSession.createConsumer(included); - remoteConsumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message msg) { - try { - TextMessage textMsg = (TextMessage)msg; - String payload = "REPLY: " + textMsg.getText(); - Destination replyTo; - replyTo = msg.getJMSReplyTo(); - textMsg.clearBody(); - textMsg.setText(payload); - remoteProducer.send(replyTo, textMsg); - } catch (JMSException e) { - e.printStackTrace(); - } + final MessageConsumer remoteConsumer = remoteSession.createConsumer(included); + remoteConsumer.setMessageListener(msg -> { + try { + final TextMessage textMsg = (TextMessage) msg; + final String payload = "REPLY: " + textMsg.getText(); + final Destination replyTo = msg.getJMSReplyTo(); + textMsg.clearBody(); + textMsg.setText(payload); + remoteProducer.send(replyTo, textMsg); + } catch (JMSException e) { + e.printStackTrace(); } }); - TopicRequestor requestor = new TopicRequestor((TopicSession)localSession, included); - // allow for consumer infos to perculate arround - Thread.sleep(5000); + final TopicRequestor requestor = new TopicRequestor((TopicSession) localSession, included); + // Wait for consumer demand to propagate across the network bridge + waitForConsumerRegistration(localBroker, 1, included); + for (int i = 0; i < MESSAGE_COUNT; i++) { - TextMessage msg = localSession.createTextMessage("test msg: " + i); - TextMessage result = (TextMessage)requestor.request(msg); + final TextMessage msg = localSession.createTextMessage("test msg: " + i); + final TextMessage result = (TextMessage) requestor.request(msg); assertNotNull(result); LOG.info(result.getText()); } @@ -129,13 +121,14 @@ public void onMessage(Message msg) { @Test(timeout = 60 * 1000) public void testFiltering() throws Exception { - MessageConsumer includedConsumer = remoteSession.createConsumer(included); - MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded); - MessageProducer includedProducer = localSession.createProducer(included); - MessageProducer excludedProducer = localSession.createProducer(excluded); - // allow for consumer infos to perculate around - Thread.sleep(2000); - Message test = localSession.createTextMessage("test"); + final MessageConsumer includedConsumer = remoteSession.createConsumer(included); + final MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded); + final MessageProducer includedProducer = localSession.createProducer(included); + final MessageProducer excludedProducer = localSession.createProducer(excluded); + // Wait for consumer demand to propagate across the network bridge + waitForConsumerRegistration(localBroker, 1, included); + + final Message test = localSession.createTextMessage("test"); includedProducer.send(test); excludedProducer.send(test); assertNull(excludedConsumer.receive(1000)); @@ -146,15 +139,15 @@ public void testFiltering() throws Exception { @Test(timeout = 60 * 1000) public void testConduitBridge() throws Exception { - MessageConsumer consumer1 = remoteSession.createConsumer(included); - MessageConsumer consumer2 = remoteSession.createConsumer(included); - MessageProducer producer = localSession.createProducer(included); + final MessageConsumer consumer1 = remoteSession.createConsumer(included); + final MessageConsumer consumer2 = remoteSession.createConsumer(included); + final MessageProducer producer = localSession.createProducer(included); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); waitForConsumerRegistration(localBroker, 2, included); for (int i = 0; i < MESSAGE_COUNT; i++) { - Message test = localSession.createTextMessage("test-" + i); + final Message test = localSession.createTextMessage("test-" + i); producer.send(test); assertNotNull(consumer1.receive(1000)); assertNotNull(consumer2.receive(1000)); @@ -170,27 +163,31 @@ public void testConduitBridge() throws Exception { } private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception { - assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - Object[] bridges = brokerService.getNetworkConnectors().get(0).bridges.values().toArray(); - if (bridges.length > 0) { - LOG.info(brokerService + " bridges " + Arrays.toString(bridges)); - DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0]; - ConcurrentMap forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap(); - LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges); - if (!forwardingBridges.isEmpty()) { - for (DemandSubscription demandSubscription : forwardingBridges.values()) { - if (demandSubscription.getLocalInfo().getDestination().equals(destination)) { - LOG.info(brokerService + " DemandSubscription " + demandSubscription + ", size: " + demandSubscription.size()); - return demandSubscription.size() >= min; - } + // Wait for the bridge demand subscriptions to register the expected number of + // remote consumers. With conduit subscriptions, multiple remote consumers map + // to a single local subscription, so we check DemandSubscription.size(). + assertTrue("Bridge demand subscription registered for " + destination, Wait.waitFor(() -> { + if (brokerService.getNetworkConnectors().isEmpty()) { + return false; + } + for (final NetworkBridge bridge : brokerService.getNetworkConnectors().get(0).activeBridges()) { + if (bridge instanceof DemandForwardingBridgeSupport) { + final DemandForwardingBridgeSupport dfBridge = (DemandForwardingBridgeSupport) bridge; + for (final DemandSubscription ds : dfBridge.getLocalSubscriptionMap().values()) { + if (ds.getLocalInfo().getDestination().equals(destination)) { + return ds.size() >= min; } } } - return false; } - })); + return false; + }, TimeUnit.SECONDS.toMillis(30), 100)); + + // Also verify the consumer is actually dispatching on the broker's destination. + // The DemandSubscription may exist before the local consumer is fully registered. + assertTrue("Consumer dispatching on " + destination, Wait.waitFor( + () -> brokerService.getDestination(destination).getDestinationStatistics().getConsumers().getCount() >= 1, + TimeUnit.SECONDS.toMillis(10), 100)); } //Added for AMQ-6465 to make sure memory usage decreased back to 0 after messages are forwarded @@ -198,27 +195,28 @@ public boolean isSatisified() throws Exception { @Test(timeout = 60 * 1000) public void testDurableTopicSubForwardMemoryUsage() throws Exception { // create a remote durable consumer to create demand - MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName); - Thread.sleep(1000); + final MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName); + // Wait for consumer demand to propagate across the network bridge + waitForConsumerRegistration(localBroker, 1, included); - MessageProducer producer = localSession.createProducer(included); + final MessageProducer producer = localSession.createProducer(included); for (int i = 0; i < MESSAGE_COUNT; i++) { - Message test = localSession.createTextMessage("test-" + i); + final Message test = localSession.createTextMessage("test-" + i); producer.send(test); } - Thread.sleep(1000); - - //Make sure stats are set - assertEquals(MESSAGE_COUNT, - localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount()); - - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0; - } - }, 10000, 500)); + //Make sure stats are set - wait for forwards to complete + assertTrue("Forwards count did not reach expected value", Wait.waitFor( + () -> { + final long count = localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount(); + LOG.info("testDurableTopicSubForwardMemoryUsage: forwards count = " + count + " (expected " + MESSAGE_COUNT + ")"); + return count == MESSAGE_COUNT; + }, + TimeUnit.SECONDS.toMillis(30), 100)); + + assertTrue("Memory usage did not return to 0", Wait.waitFor( + () -> localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0, + TimeUnit.SECONDS.toMillis(10), 500)); remoteConsumer.close(); } @@ -226,28 +224,25 @@ public boolean isSatisified() throws Exception { //to the other broker @Test(timeout = 60 * 1000) public void testTopicSubForwardMemoryUsage() throws Exception { - // create a remote durable consumer to create demand - MessageConsumer remoteConsumer = remoteSession.createConsumer(included); - Thread.sleep(1000); + // create a remote consumer to create demand + final MessageConsumer remoteConsumer = remoteSession.createConsumer(included); + // Wait for consumer demand to propagate across the network bridge + waitForConsumerRegistration(localBroker, 1, included); - MessageProducer producer = localSession.createProducer(included); + final MessageProducer producer = localSession.createProducer(included); for (int i = 0; i < MESSAGE_COUNT; i++) { - Message test = localSession.createTextMessage("test-" + i); + final Message test = localSession.createTextMessage("test-" + i); producer.send(test); } - Thread.sleep(1000); - - //Make sure stats are set - assertEquals(MESSAGE_COUNT, - localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount()); - assertTrue(Wait.waitFor(new Condition() { + //Make sure stats are set - wait for forwards to complete + assertTrue("Forwards count did not reach expected value", Wait.waitFor( + () -> localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount() == MESSAGE_COUNT, + TimeUnit.SECONDS.toMillis(30), 100)); - @Override - public boolean isSatisified() throws Exception { - return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0; - } - }, 10000, 500)); + assertTrue("Memory usage did not return to 0", Wait.waitFor( + () -> localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0, + TimeUnit.SECONDS.toMillis(10), 500)); for (int i = 0; i < MESSAGE_COUNT; i++) { assertNotNull("message count: " + i, remoteConsumer.receive(2500)); @@ -259,28 +254,25 @@ public boolean isSatisified() throws Exception { //to the other broker @Test(timeout = 60 * 1000) public void testQueueSubForwardMemoryUsage() throws Exception { - ActiveMQQueue queue = new ActiveMQQueue("include.test.foo"); - MessageConsumer remoteConsumer = remoteSession.createConsumer(queue); - Thread.sleep(1000); + final ActiveMQQueue queue = new ActiveMQQueue("include.test.foo"); + final MessageConsumer remoteConsumer = remoteSession.createConsumer(queue); + // Wait for consumer demand to propagate across the network bridge + waitForConsumerRegistration(localBroker, 1, queue); - MessageProducer producer = localSession.createProducer(queue); + final MessageProducer producer = localSession.createProducer(queue); for (int i = 0; i < MESSAGE_COUNT; i++) { - Message test = localSession.createTextMessage("test-" + i); + final Message test = localSession.createTextMessage("test-" + i); producer.send(test); } - Thread.sleep(1000); - - //Make sure stats are set - assertEquals(MESSAGE_COUNT, - localBroker.getDestination(queue).getDestinationStatistics().getForwards().getCount()); - assertTrue(Wait.waitFor(new Condition() { + //Make sure stats are set - wait for forwards to complete + assertTrue("Forwards count did not reach expected value", Wait.waitFor( + () -> localBroker.getDestination(queue).getDestinationStatistics().getForwards().getCount() == MESSAGE_COUNT, + TimeUnit.SECONDS.toMillis(30), 100)); - @Override - public boolean isSatisified() throws Exception { - return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0; - } - }, 10000, 500)); + assertTrue("Memory usage did not return to 0", Wait.waitFor( + () -> localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0, + TimeUnit.SECONDS.toMillis(10), 500)); for (int i = 0; i < MESSAGE_COUNT; i++) { assertNotNull("message count: " + i, remoteConsumer.receive(2500)); @@ -291,28 +283,30 @@ public boolean isSatisified() throws Exception { @Test(timeout = 60 * 1000) public void testDurableStoreAndForward() throws Exception { // create a remote durable consumer - MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName); - Thread.sleep(1000); + final MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName); + // Wait for consumer demand to propagate across the network bridge + waitForConsumerRegistration(localBroker, 1, included); + // now close everything down and restart doTearDown(); doSetUp(false); - MessageProducer producer = localSession.createProducer(included); + final MessageProducer producer = localSession.createProducer(included); for (int i = 0; i < MESSAGE_COUNT; i++) { - Message test = localSession.createTextMessage("test-" + i); + final Message test = localSession.createTextMessage("test-" + i); producer.send(test); } - Thread.sleep(1000); - //Make sure stats are set - assertEquals(MESSAGE_COUNT, - localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount()); + //Make sure stats are set - wait for forwards to complete + assertTrue("Forwards count did not reach expected value", Wait.waitFor( + () -> localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount() == MESSAGE_COUNT, + TimeUnit.SECONDS.toMillis(30), 100)); // close everything down and restart doTearDown(); doSetUp(false); - remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName); + final MessageConsumer remoteConsumer2 = remoteSession.createDurableSubscriber(included, consumerName); for (int i = 0; i < MESSAGE_COUNT; i++) { - assertNotNull("message count: " + i, remoteConsumer.receive(2500)); + assertNotNull("message count: " + i, remoteConsumer2.receive(2500)); } } @@ -320,35 +314,55 @@ public void testDurableStoreAndForward() throws Exception { "it requires a connection per durable to match that connectionId") public void testDurableStoreAndForwardReconnect() throws Exception { // create a local durable consumer - MessageConsumer localConsumer = localSession.createDurableSubscriber(included, consumerName); - Thread.sleep(5000); + final MessageConsumer localConsumer = localSession.createDurableSubscriber(included, consumerName); + // Wait for consumer demand to propagate across the network bridge + waitForConsumerRegistration(localBroker, 1, included); + // now close everything down and restart doTearDown(); doSetUp(false); // send messages - MessageProducer producer = localSession.createProducer(included); + final MessageProducer producer = localSession.createProducer(included); for (int i = 0; i < MESSAGE_COUNT; i++) { - Message test = localSession.createTextMessage("test-" + i); + final Message test = localSession.createTextMessage("test-" + i); producer.send(test); } - Thread.sleep(5000); + + //Make sure stats are set - wait for forwards to complete + assertTrue("Forwards count did not reach expected value", Wait.waitFor( + () -> localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount() == MESSAGE_COUNT, + TimeUnit.SECONDS.toMillis(30), 100)); + // consume some messages locally - localConsumer = localSession.createDurableSubscriber(included, consumerName); - LOG.info("Consume from local consumer: " + localConsumer); + final MessageConsumer localConsumer2 = localSession.createDurableSubscriber(included, consumerName); + LOG.info("Consume from local consumer: " + localConsumer2); for (int i = 0; i < MESSAGE_COUNT / 2; i++) { - assertNotNull("message count: " + i, localConsumer.receive(2500)); + assertNotNull("message count: " + i, localConsumer2.receive(2500)); } - Thread.sleep(5000); + + // Wait for local messages to be consumed + assertTrue("Local messages consumed", Wait.waitFor( + () -> localBroker.getDestination(included).getDestinationStatistics().getDequeues().getCount() >= MESSAGE_COUNT / 2, + TimeUnit.SECONDS.toMillis(10), 100)); + // close everything down and restart doTearDown(); doSetUp(false); - Thread.sleep(5000); + + // Wait for network bridge to re-form + assertTrue("Network bridge re-formed", Wait.waitFor( + () -> !localBroker.getNetworkConnectors().isEmpty() + && !localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty(), + TimeUnit.SECONDS.toMillis(10), 100)); LOG.info("Consume from remote"); // consume the rest remotely - MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName); + final MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName); LOG.info("Remote consumer: " + remoteConsumer); - Thread.sleep(5000); + + // Wait for consumer demand to propagate + waitForConsumerRegistration(localBroker, 1, included); + for (int i = 0; i < MESSAGE_COUNT / 2; i++) { assertNotNull("message count: " + i, remoteConsumer.receive(10000)); } @@ -359,14 +373,11 @@ protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); final NetworkBridge remoteBridge = remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() && - 0 == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() && - expectedRemoteSent == remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() && - 0 == remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount(); - } - })); + assertTrue(Wait.waitFor(() -> + expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() && + 0 == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() && + expectedRemoteSent == remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() && + 0 == remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() + , TimeUnit.SECONDS.toMillis(30), 100)); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java index d5e2a1a8428..7eaefd23bde 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java @@ -124,7 +124,7 @@ public void testThreadsInvolvedInXInactivityTimeouts() throws Exception { assertTrue("Should be at most inactivity monitor pool size * 2. Diff = " + diff, diff <= 2*poolSize); - assertTrue("all work complete", doneConsumers.await(10, TimeUnit.SECONDS)); + assertTrue("all work complete", doneConsumers.await(30, TimeUnit.SECONDS)); } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java index 880b394153e..2d189af4585 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java @@ -232,11 +232,16 @@ public void testJmsBrowserGetsPagedIn() throws Exception { assertEquals("see only the paged in for pull", 1, browsed.size()); - // Wait for all messages to be available (including redelivery of unacked message) - assertTrue("All messages available for consumption", Wait.waitFor(() -> { + // Wait for the unacked message to be redelivered after connection close. + // With zero prefetch + lazy dispatch, the broker needs time to process the + // connection close, redeliver the unacked message, and make it available again. + // Check that inflight count drops to 0, meaning the message has been returned. + assertTrue("Unacked message redelivered after connection close", Wait.waitFor(() -> { final Queue queue = (Queue) broker.getDestination(destination); - return queue != null && queue.getDestinationStatistics().getMessages().getCount() == numToSend; - }, 5000, 100)); + return queue != null + && queue.getDestinationStatistics().getMessages().getCount() == numToSend + && queue.getDestinationStatistics().getInflight().getCount() == 0; + }, TimeUnit.SECONDS.toMillis(20), 100)); // consume messages ArrayList consumeList = consumeMessages("TestQ"); @@ -280,7 +285,7 @@ private ArrayList consumeMessages(String queueName) throws Exception { boolean finished = false; while (!finished) { - Message message = consumer.receive(returnedMessages.isEmpty() ? 5000 : 1000); + Message message = consumer.receive(returnedMessages.isEmpty() ? TimeUnit.SECONDS.toMillis(20) : 500); if (message == null) { finished = true; }