diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java index e031308b892..48e3901bfac 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import jakarta.jms.Connection; import jakarta.jms.MessageConsumer; import jakarta.jms.Queue; @@ -31,6 +32,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.Wait; public class FailoverClusterTest extends TestCase { @@ -53,12 +55,8 @@ public void testClusterConnectedAfterClients() throws Exception { if (brokerB == null) { brokerB = createBrokerB(getBindAddress()); } - Thread.sleep(3000); - Set set = new HashSet(); - for (ActiveMQConnection c : connections) { - set.add(c.getTransportChannel().getRemoteAddress()); - } - assertTrue(set.size() > 1); + assertTrue("clients should connect to multiple brokers", + waitForClientRebalance(connections, 2)); } public void testClusterURIOptionsStrip() throws Exception { @@ -67,12 +65,8 @@ public void testClusterURIOptionsStrip() throws Exception { // add in server side only url param, should not be propagated brokerB = createBrokerB(getBindAddress() + "?transport.closeAsync=false"); } - Thread.sleep(3000); - Set set = new HashSet(); - for (ActiveMQConnection c : connections) { - set.add(c.getTransportChannel().getRemoteAddress()); - } - assertTrue(set.size() > 1); + assertTrue("clients should connect to multiple brokers", + waitForClientRebalance(connections, 2)); } public void testClusterConnectedBeforeClients() throws Exception { @@ -80,17 +74,26 @@ public void testClusterConnectedBeforeClients() throws Exception { if (brokerB == null) { brokerB = createBrokerB(getBindAddress()); } - Thread.sleep(5000); + + assertTrue("bridge should form between brokers", + Wait.waitFor(() -> !brokerB.getNetworkConnectors().get(0).activeBridges().isEmpty(), + TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500))); + createClients(); - Thread.sleep(2000); + + final URI brokerBURI = new URI(brokerB.getTransportConnectors().get(0).getPublishableConnectString()); brokerA.stop(); - Thread.sleep(2000); - URI brokerBURI = new URI( brokerB.getTransportConnectors().get(0).getPublishableConnectString()); - for (ActiveMQConnection c : connections) { - String addr = c.getTransportChannel().getRemoteAddress(); - assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0); - } + assertTrue("all clients should failover to broker B", + Wait.waitFor(() -> { + for (final ActiveMQConnection c : connections) { + final String addr = c.getTransportChannel().getRemoteAddress(); + if (addr == null || addr.indexOf("" + brokerBURI.getPort()) <= 0) { + return false; + } + } + return true; + }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500))); } @Override @@ -154,14 +157,28 @@ protected void configureNetwork(BrokerService answer, String uri) throws Excepti @SuppressWarnings("unused") protected void createClients() throws Exception { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl); + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl); for (int i = 0; i < NUMBER; i++) { - ActiveMQConnection c = (ActiveMQConnection) factory.createConnection(); + final ActiveMQConnection c = (ActiveMQConnection) factory.createConnection(); c.start(); - Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = s.createQueue(getClass().getName()); - MessageConsumer consumer = s.createConsumer(queue); + final Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue queue = s.createQueue(getClass().getName()); + final MessageConsumer consumer = s.createConsumer(queue); connections.add(c); } } + + private static boolean waitForClientRebalance(final List connections, + final int minBrokerCount) throws Exception { + return Wait.waitFor(() -> { + final Set set = new HashSet<>(); + for (final ActiveMQConnection c : connections) { + final String addr = c.getTransportChannel().getRemoteAddress(); + if (addr != null) { + set.add(addr); + } + } + return set.size() >= minBrokerCount; + }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500)); + } }