Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Queue;
Expand All @@ -31,6 +32,7 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;

public class FailoverClusterTest extends TestCase {

Expand All @@ -53,12 +55,8 @@ public void testClusterConnectedAfterClients() throws Exception {
if (brokerB == null) {
brokerB = createBrokerB(getBindAddress());
}
Thread.sleep(3000);
Set<String> set = new HashSet<String>();
for (ActiveMQConnection c : connections) {
set.add(c.getTransportChannel().getRemoteAddress());
}
assertTrue(set.size() > 1);
assertTrue("clients should connect to multiple brokers",
waitForClientRebalance(connections, 2));
}

public void testClusterURIOptionsStrip() throws Exception {
Expand All @@ -67,30 +65,35 @@ public void testClusterURIOptionsStrip() throws Exception {
// add in server side only url param, should not be propagated
brokerB = createBrokerB(getBindAddress() + "?transport.closeAsync=false");
}
Thread.sleep(3000);
Set<String> set = new HashSet<String>();
for (ActiveMQConnection c : connections) {
set.add(c.getTransportChannel().getRemoteAddress());
}
assertTrue(set.size() > 1);
assertTrue("clients should connect to multiple brokers",
waitForClientRebalance(connections, 2));
}

public void testClusterConnectedBeforeClients() throws Exception {

if (brokerB == null) {
brokerB = createBrokerB(getBindAddress());
}
Thread.sleep(5000);

assertTrue("bridge should form between brokers",
Wait.waitFor(() -> !brokerB.getNetworkConnectors().get(0).activeBridges().isEmpty(),
TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500)));

createClients();
Thread.sleep(2000);

final URI brokerBURI = new URI(brokerB.getTransportConnectors().get(0).getPublishableConnectString());
brokerA.stop();
Thread.sleep(2000);

URI brokerBURI = new URI( brokerB.getTransportConnectors().get(0).getPublishableConnectString());
for (ActiveMQConnection c : connections) {
String addr = c.getTransportChannel().getRemoteAddress();
assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0);
}
assertTrue("all clients should failover to broker B",
Wait.waitFor(() -> {
for (final ActiveMQConnection c : connections) {
final String addr = c.getTransportChannel().getRemoteAddress();
if (addr == null || addr.indexOf("" + brokerBURI.getPort()) <= 0) {
return false;
}
}
return true;
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500)));
}

@Override
Expand Down Expand Up @@ -154,14 +157,28 @@ protected void configureNetwork(BrokerService answer, String uri) throws Excepti

@SuppressWarnings("unused")
protected void createClients() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
for (int i = 0; i < NUMBER; i++) {
ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
final ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
final Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = s.createQueue(getClass().getName());
final MessageConsumer consumer = s.createConsumer(queue);
connections.add(c);
}
}

private static boolean waitForClientRebalance(final List<ActiveMQConnection> connections,
final int minBrokerCount) throws Exception {
return Wait.waitFor(() -> {
final Set<String> set = new HashSet<>();
for (final ActiveMQConnection c : connections) {
final String addr = c.getTransportChannel().getRemoteAddress();
if (addr != null) {
set.add(addr);
}
}
return set.size() >= minBrokerCount;
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500));
}
}
Loading