Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,22 @@
*/
package org.apache.activemq.network;

import static org.junit.Assert.assertTrue;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import jakarta.jms.Connection;
import jakarta.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -59,11 +68,15 @@ protected void doTearDown() throws Exception {
if(remoteConnection != null)
remoteConnection.close();

if(localBroker != null)
if(localBroker != null) {
localBroker.stop();
localBroker.waitUntilStopped();
}

if(remoteBroker != null)
if(remoteBroker != null) {
remoteBroker.stop();
remoteBroker.waitUntilStopped();
}
}

protected void doSetUp(boolean deleteAllMessages) throws Exception {
Expand All @@ -75,14 +88,25 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception {
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.start();
localBroker.waitUntilStarted();
URI localURI = localBroker.getVmConnectorURI();

// Programmatically add network connectors using the actual assigned ephemeral ports.
// Use startNetworkConnector() instead of connector.start() to ensure proper JMX MBean registration.
addNetworkConnectors();

// Wait for both network bridges to be FULLY started (advisory consumers registered).
// activeBridges().isEmpty() is NOT sufficient because bridges are added to the map
// before start() completes asynchronously. We must wait for the startedLatch.
waitForBridgeFullyStarted(localBroker, "Local");
waitForBridgeFullyStarted(remoteBroker, "Remote");

final URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
fac.setAlwaysSyncSend(true);
fac.setDispatchAsync(false);
localConnection = fac.createConnection();
localConnection.setClientID("clientId");
localConnection.start();
URI remoteURI = remoteBroker.getVmConnectorURI();
final URI remoteURI = remoteBroker.getVmConnectorURI();
fac = new ActiveMQConnectionFactory(remoteURI);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("clientId");
Expand All @@ -91,21 +115,72 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception {
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

/**
* Programmatically adds network connectors between the local and remote brokers
* using the actual assigned ephemeral ports. This avoids hardcoding ports in XML
* config files which causes port conflicts on CI.
*/
protected void addNetworkConnectors() throws Exception {
final URI remoteConnectURI = remoteBroker.getTransportConnectors().get(0).getConnectUri();
final URI localConnectURI = localBroker.getTransportConnectors().get(0).getConnectUri();

// Local -> Remote network connector (matches the original localBroker.xml config)
final DiscoveryNetworkConnector localToRemote = new DiscoveryNetworkConnector(
new URI("static:(" + remoteConnectURI + ")"));
localToRemote.setName("networkConnector");
localToRemote.setDynamicOnly(false);
localToRemote.setConduitSubscriptions(true);
localToRemote.setDecreaseNetworkConsumerPriority(false);

final List<ActiveMQDestination> dynamicallyIncluded = new ArrayList<>();
dynamicallyIncluded.add(new ActiveMQQueue("include.test.foo"));
dynamicallyIncluded.add(new ActiveMQTopic("include.test.bar"));
localToRemote.setDynamicallyIncludedDestinations(dynamicallyIncluded);

final List<ActiveMQDestination> excluded = new ArrayList<>();
excluded.add(new ActiveMQQueue("exclude.test.foo"));
excluded.add(new ActiveMQTopic("exclude.test.bar"));
localToRemote.setExcludedDestinations(excluded);

localBroker.addNetworkConnector(localToRemote);
// startNetworkConnector handles JMX MBean registration and connector startup
localBroker.startNetworkConnector(localToRemote, null);

// Remote -> Local network connector (matches the original remoteBroker.xml config)
final DiscoveryNetworkConnector remoteToLocal = new DiscoveryNetworkConnector(
new URI("static:(" + localConnectURI + ")"));
remoteToLocal.setName("networkConnector");
remoteBroker.addNetworkConnector(remoteToLocal);
remoteBroker.startNetworkConnector(remoteToLocal, null);
}

protected void waitForBridgeFullyStarted(final BrokerService broker, final String label) throws Exception {
assertTrue(label + " broker bridge should be fully started", Wait.waitFor(() -> {
if (broker.getNetworkConnectors().isEmpty()
|| broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
return false;
}
final NetworkBridge bridge = broker.getNetworkConnectors().get(0).activeBridges().iterator().next();
if (bridge instanceof DemandForwardingBridgeSupport) {
return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0;
}
return true;
}, TimeUnit.SECONDS.toMillis(10), 100));
}

protected String getRemoteBrokerURI() {
return "org/apache/activemq/network/remoteBroker.xml";
return "org/apache/activemq/network/remoteBroker-ephemeral.xml";
}

protected String getLocalBrokerURI() {
return "org/apache/activemq/network/localBroker.xml";
return "org/apache/activemq/network/localBroker-ephemeral.xml";
}

protected BrokerService createBroker(String uri) throws Exception {
Resource resource = new ClassPathResource(uri);
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
resource = new ClassPathResource(uri);
factory = new BrokerFactoryBean(resource);
final Resource resource = new ClassPathResource(uri);
final BrokerFactoryBean factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
BrokerService result = factory.getBroker();
final BrokerService result = factory.getBroker();
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.network;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.jms.Connection;
Expand All @@ -25,7 +26,6 @@
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
Expand All @@ -39,6 +39,7 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.xbean.BrokerFactoryBean;
Expand Down Expand Up @@ -68,32 +69,28 @@ public class NetworkFailoverTest extends TestCase {
public void testRequestReply() throws Exception {
final MessageProducer remoteProducer = remoteSession.createProducer(null);
MessageConsumer remoteConsumer = remoteSession.createConsumer(included);
remoteConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
final TextMessage textMsg = (TextMessage)msg;
remoteConsumer.setMessageListener(msg -> {
final TextMessage textMsg = (TextMessage) msg;
try {
final String payload = "REPLY: " + textMsg.getText() + ", " + textMsg.getJMSMessageID();
final Destination replyTo = msg.getJMSReplyTo();
textMsg.clearBody();
textMsg.setText(payload);
LOG.info("*** Sending response: {}", textMsg.getText());
remoteProducer.send(replyTo, textMsg);
LOG.info("replied with: " + textMsg.getJMSMessageID());

} catch (DestinationDoesNotExistException expected) {
// been removed but not yet recreated
replyToNonExistDest.incrementAndGet();
try {
String payload = "REPLY: " + textMsg.getText() + ", " + textMsg.getJMSMessageID();
Destination replyTo;
replyTo = msg.getJMSReplyTo();
textMsg.clearBody();
textMsg.setText(payload);
LOG.info("*** Sending response: {}", textMsg.getText());
remoteProducer.send(replyTo, textMsg);
LOG.info("replied with: " + textMsg.getJMSMessageID());

} catch (DestinationDoesNotExistException expected) {
// been removed but not yet recreated
replyToNonExistDest.incrementAndGet();
try {
LOG.info("NED: " + textMsg.getJMSMessageID());
} catch (JMSException e) {
e.printStackTrace();
};
} catch (Exception e) {
LOG.warn("*** Responder listener caught exception: ", e);
LOG.info("NED: " + textMsg.getJMSMessageID());
} catch (JMSException e) {
e.printStackTrace();
}
} catch (Exception e) {
LOG.warn("*** Responder listener caught exception: ", e);
e.printStackTrace();
}
});

Expand All @@ -104,16 +101,13 @@ public void onMessage(Message msg) {

// track remote dlq for forward failures
MessageConsumer dlqconsumer = remoteSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
dlqconsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
LOG.info("dlq " + message.getJMSMessageID());
} catch (JMSException e) {
e.printStackTrace();
}
remoteDLQCount.incrementAndGet();
dlqconsumer.setMessageListener(message -> {
try {
LOG.info("dlq " + message.getJMSMessageID());
} catch (JMSException e) {
e.printStackTrace();
}
remoteDLQCount.incrementAndGet();
});

// allow for consumer infos to perculate arround
Expand Down Expand Up @@ -176,25 +170,51 @@ protected void doTearDown() throws Exception {
} catch(Exception ex) {}
}

protected void doSetUp(boolean deleteAllMessages) throws Exception {
protected void doSetUp(final boolean deleteAllMessages) throws Exception {

remoteBroker = createRemoteBroker();
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.setCacheTempDestinations(true);
remoteBroker.start();
remoteBroker.waitUntilStarted();

localBroker = createLocalBroker();
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.setCacheTempDestinations(true);
localBroker.start();

String localURI = "tcp://localhost:61616";
String remoteURI = "tcp://localhost:61617";
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:("+localURI+","+remoteURI+")?randomize=false&backup=false&trackMessages=true");
localBroker.waitUntilStarted();

// Get actual assigned ephemeral ports
final URI localConnectURI = localBroker.getTransportConnectors().get(0).getConnectUri();
final URI remoteConnectURI = remoteBroker.getTransportConnectors().get(0).getConnectUri();
final String localURI = localConnectURI.toString();
final String remoteURI = remoteConnectURI.toString();

// Add network connectors programmatically using actual ports
final DiscoveryNetworkConnector localToRemote = new DiscoveryNetworkConnector(
new URI("static://(" + remoteURI + ")"));
localToRemote.setName("networkConnector");
localToRemote.setDynamicOnly(false);
localToRemote.setConduitSubscriptions(true);
localToRemote.setDecreaseNetworkConsumerPriority(false);
localToRemote.setDynamicallyIncludedDestinations(
java.util.List.of(new ActiveMQQueue("include.test.foo"), new ActiveMQTopic("include.test.bar")));
localToRemote.setExcludedDestinations(
java.util.List.of(new ActiveMQQueue("exclude.test.foo"), new ActiveMQTopic("exclude.test.bar")));
localBroker.addNetworkConnector(localToRemote);
localBroker.startNetworkConnector(localToRemote, null);

final DiscoveryNetworkConnector remoteToLocal = new DiscoveryNetworkConnector(
new URI("static://(" + localURI + ")"));
remoteToLocal.setName("networkConnector");
remoteBroker.addNetworkConnector(remoteToLocal);
remoteBroker.startNetworkConnector(remoteToLocal, null);

ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:(" + localURI + "," + remoteURI + ")?randomize=false&backup=false&trackMessages=true");
localConnection = fac.createConnection();
localConnection.setClientID("local");
localConnection.start();
fac = new ActiveMQConnectionFactory("failover:("+remoteURI + ","+localURI+")?randomize=false&backup=false&trackMessages=true");
fac = new ActiveMQConnectionFactory("failover:(" + remoteURI + "," + localURI + ")?randomize=false&backup=false&trackMessages=true");
fac.setWatchTopicAdvisories(false);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("remote");
Expand All @@ -205,11 +225,11 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception {
}

protected String getRemoteBrokerURI() {
return "org/apache/activemq/network/remoteBroker.xml";
return "org/apache/activemq/network/remoteBroker-ephemeral.xml";
}

protected String getLocalBrokerURI() {
return "org/apache/activemq/network/localBroker.xml";
return "org/apache/activemq/network/localBroker-ephemeral.xml";
}

protected BrokerService createBroker(String uri) throws Exception {
Expand Down
Loading
Loading