Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
Expand All @@ -27,9 +30,12 @@

import jakarta.annotation.Resource;
import jakarta.jms.*;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({ "virtual-topic-network-test.xml" })
Expand All @@ -53,45 +59,64 @@ public class MessageDestinationVirtualTopicTest {

private Session session1;

public void init() throws JMSException {
public void init() throws Exception {
// Get actual assigned ephemeral ports
final String broker1URL = broker1.getTransportConnectors().get(0).getConnectUri().toString();
final String broker2URL = broker2.getTransportConnectors().get(0).getConnectUri().toString();
LOG.info("Broker1 URL: {}", broker1URL);
LOG.info("Broker2 URL: {}", broker2URL);

// Add network connector from broker2 to broker1 programmatically using actual port
final DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector(
new URI("static://(" + broker1URL + ")"));
nc.setName("linkToBrokerB1");
nc.setNetworkTTL(1);
nc.setDuplex(true);
broker2.addNetworkConnector(nc);
nc.start();

// Wait for bridge to be established
assertTrue("Network bridge should be established",
Wait.waitFor(() -> nc.activeBridges().size() == 1, 10_000, 500));

// Create connection on Broker B2
ConnectionFactory broker2ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:62616");
Connection connection2 = broker2ConnectionFactory.createConnection();
final ConnectionFactory broker2ConnectionFactory = new ActiveMQConnectionFactory(broker2URL);
final Connection connection2 = broker2ConnectionFactory.createConnection();
connection2.start();
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerDQueue = session2.createQueue("Consumer.D.VirtualTopic.T1");
final Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue consumerDQueue = session2.createQueue("Consumer.D.VirtualTopic.T1");

// Bind listener on queue for consumer D
MessageConsumer consumer = session2.createConsumer(consumerDQueue);
final MessageConsumer consumer = session2.createConsumer(consumerDQueue);
listener2 = new SimpleMessageListener();
consumer.setMessageListener(listener2);

// Create connection on Broker B1
ConnectionFactory broker1ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection1 = broker1ConnectionFactory.createConnection();
final ConnectionFactory broker1ConnectionFactory = new ActiveMQConnectionFactory(broker1URL);
final Connection connection1 = broker1ConnectionFactory.createConnection();
connection1.start();
session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerCQueue = session1.createQueue("Consumer.C.VirtualTopic.T1");
final Queue consumerCQueue = session1.createQueue("Consumer.C.VirtualTopic.T1");

// Bind listener on queue for consumer D
MessageConsumer consumer1 = session1.createConsumer(consumerCQueue);
// Bind listener on queue for consumer C
final MessageConsumer consumer1 = session1.createConsumer(consumerCQueue);
listener1 = new SimpleMessageListener();
consumer1.setMessageListener(listener1);

// Create listener on Broker B1 for VT T2 witout setOriginalDest
Queue consumer3Queue = session1.createQueue("Consumer.A.VirtualTopic.T2");
// Create listener on Broker B1 for VT T2 without setOriginalDest
final Queue consumer3Queue = session1.createQueue("Consumer.A.VirtualTopic.T2");

// Bind listener on queue for consumer D
MessageConsumer consumerD = session1.createConsumer(consumer3Queue);
// Bind listener on queue for consumer A
final MessageConsumer consumerD = session1.createConsumer(consumer3Queue);
listener3 = new SimpleMessageListener();
consumerD.setMessageListener(listener3);

// Create producer for topic, on B1
Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1,VirtualTopic.T2");
final Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1,VirtualTopic.T2");
producer = session1.createProducer(virtualTopicT1);
}

@Test
@Test(timeout = 30_000)
public void testDestinationNames() throws Exception {

LOG.info("Started waiting for broker 1 and 2");
Expand All @@ -102,32 +127,46 @@ public void testDestinationNames() throws Exception {
init();

// Create a monitor
CountDownLatch monitor = new CountDownLatch(3);
final CountDownLatch monitor = new CountDownLatch(3);
listener1.setCountDown(monitor);
listener2.setCountDown(monitor);
listener3.setCountDown(monitor);

// Wait for the consumer on broker2 to be visible on broker1 via the network bridge.
// The virtual topic Consumer.D.VirtualTopic.T1 on broker2 must be forwarded to broker1
// before sending, otherwise the message won't reach listener2.
assertTrue("Consumer.D queue should exist on broker1 via network bridge",
Wait.waitFor(() -> {
try {
final org.apache.activemq.broker.region.Destination dest =
broker1.getDestination(new ActiveMQQueue("Consumer.D.VirtualTopic.T1"));
return dest != null && dest.getConsumers().size() >= 1;
} catch (final Exception e) {
return false;
}
}, 10_000, 200));

LOG.info("Sending message");
// Send a message on the topic
TextMessage message = session1.createTextMessage("Hello World !");
final TextMessage message = session1.createTextMessage("Hello World !");
producer.send(message);
LOG.info("Waiting for message reception");
// Wait the two messages in the related queues
monitor.await();
assertTrue("All 3 listeners should receive messages", monitor.await(15, TimeUnit.SECONDS));

// Get the message destinations
String lastJMSDestination2 = listener2.getLastJMSDestination();
System.err.println(lastJMSDestination2);
String lastJMSDestination1 = listener1.getLastJMSDestination();
System.err.println(lastJMSDestination1);
final String lastJMSDestination2 = listener2.getLastJMSDestination();
LOG.info("Listener2 destination: {}", lastJMSDestination2);
final String lastJMSDestination1 = listener1.getLastJMSDestination();
LOG.info("Listener1 destination: {}", lastJMSDestination1);

String lastJMSDestination3 = listener3.getLastJMSDestination();
System.err.println(lastJMSDestination3);
final String lastJMSDestination3 = listener3.getLastJMSDestination();
LOG.info("Listener3 destination: {}", lastJMSDestination3);

// The destination names
assertEquals("queue://Consumer.D.VirtualTopic.T1", lastJMSDestination2);
assertEquals("queue://Consumer.C.VirtualTopic.T1", lastJMSDestination1);
assertEquals("topic://VirtualTopic.T2", lastJMSDestination3);

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,22 @@
*/
package org.apache.activemq.network;

import static org.junit.Assert.assertTrue;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import jakarta.jms.Connection;
import jakarta.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -59,11 +68,15 @@ protected void doTearDown() throws Exception {
if(remoteConnection != null)
remoteConnection.close();

if(localBroker != null)
if(localBroker != null) {
localBroker.stop();
localBroker.waitUntilStopped();
}

if(remoteBroker != null)
if(remoteBroker != null) {
remoteBroker.stop();
remoteBroker.waitUntilStopped();
}
}

protected void doSetUp(boolean deleteAllMessages) throws Exception {
Expand All @@ -75,14 +88,29 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception {
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.start();
localBroker.waitUntilStarted();
URI localURI = localBroker.getVmConnectorURI();

// Programmatically add network connectors using the actual assigned ephemeral ports.
// Use startNetworkConnector() instead of connector.start() to ensure proper JMX MBean registration.
addNetworkConnectors();

// Wait for network bridges to be FULLY started (advisory consumers registered).
// activeBridges().isEmpty() is NOT sufficient because bridges are added to the map
// before start() completes asynchronously. We must wait for the startedLatch.
waitForBridgeFullyStarted(localBroker, "Local");
// Only wait for remote bridge if the remote broker has its own network connector
// (duplex bridges don't add a separate connector on the remote side)
if (!remoteBroker.getNetworkConnectors().isEmpty()) {
waitForBridgeFullyStarted(remoteBroker, "Remote");
}

final URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
fac.setAlwaysSyncSend(true);
fac.setDispatchAsync(false);
localConnection = fac.createConnection();
localConnection.setClientID("clientId");
localConnection.start();
URI remoteURI = remoteBroker.getVmConnectorURI();
final URI remoteURI = remoteBroker.getVmConnectorURI();
fac = new ActiveMQConnectionFactory(remoteURI);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("clientId");
Expand All @@ -91,21 +119,72 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception {
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

/**
* Programmatically adds network connectors between the local and remote brokers
* using the actual assigned ephemeral ports. This avoids hardcoding ports in XML
* config files which causes port conflicts on CI.
*/
protected void addNetworkConnectors() throws Exception {
final URI remoteConnectURI = remoteBroker.getTransportConnectors().get(0).getConnectUri();
final URI localConnectURI = localBroker.getTransportConnectors().get(0).getConnectUri();

// Local -> Remote network connector (matches the original localBroker.xml config)
final DiscoveryNetworkConnector localToRemote = new DiscoveryNetworkConnector(
new URI("static:(" + remoteConnectURI + ")"));
localToRemote.setName("networkConnector");
localToRemote.setDynamicOnly(false);
localToRemote.setConduitSubscriptions(true);
localToRemote.setDecreaseNetworkConsumerPriority(false);

final List<ActiveMQDestination> dynamicallyIncluded = new ArrayList<>();
dynamicallyIncluded.add(new ActiveMQQueue("include.test.foo"));
dynamicallyIncluded.add(new ActiveMQTopic("include.test.bar"));
localToRemote.setDynamicallyIncludedDestinations(dynamicallyIncluded);

final List<ActiveMQDestination> excluded = new ArrayList<>();
excluded.add(new ActiveMQQueue("exclude.test.foo"));
excluded.add(new ActiveMQTopic("exclude.test.bar"));
localToRemote.setExcludedDestinations(excluded);

localBroker.addNetworkConnector(localToRemote);
// startNetworkConnector handles JMX MBean registration and connector startup
localBroker.startNetworkConnector(localToRemote, null);

// Remote -> Local network connector (matches the original remoteBroker.xml config)
final DiscoveryNetworkConnector remoteToLocal = new DiscoveryNetworkConnector(
new URI("static:(" + localConnectURI + ")"));
remoteToLocal.setName("networkConnector");
remoteBroker.addNetworkConnector(remoteToLocal);
remoteBroker.startNetworkConnector(remoteToLocal, null);
}

protected void waitForBridgeFullyStarted(final BrokerService broker, final String label) throws Exception {
assertTrue(label + " broker bridge should be fully started", Wait.waitFor(() -> {
if (broker.getNetworkConnectors().isEmpty()
|| broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
return false;
}
final NetworkBridge bridge = broker.getNetworkConnectors().get(0).activeBridges().iterator().next();
if (bridge instanceof DemandForwardingBridgeSupport) {
return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0;
}
return true;
}, TimeUnit.SECONDS.toMillis(10), 100));
}

protected String getRemoteBrokerURI() {
return "org/apache/activemq/network/remoteBroker.xml";
return "org/apache/activemq/network/remoteBroker-ephemeral.xml";
}

protected String getLocalBrokerURI() {
return "org/apache/activemq/network/localBroker.xml";
return "org/apache/activemq/network/localBroker-ephemeral.xml";
}

protected BrokerService createBroker(String uri) throws Exception {
Resource resource = new ClassPathResource(uri);
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
resource = new ClassPathResource(uri);
factory = new BrokerFactoryBean(resource);
final Resource resource = new ClassPathResource(uri);
final BrokerFactoryBean factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
BrokerService result = factory.getBroker();
final BrokerService result = factory.getBroker();
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, final in
protected DemandForwardingBridge findDuplexBridge(final TransportConnector connector) throws Exception {
assertNotNull(connector);

for (TransportConnection tc : connector.getConnections()) {
if (tc.getConnectionId().startsWith("networkConnector_")) {
for (final TransportConnection tc : connector.getConnections()) {
final String connectionId = tc.getConnectionId();
if (connectionId != null && connectionId.startsWith("networkConnector_")) {
final Field bridgeField = TransportConnection.class.getDeclaredField("duplexBridge");
bridgeField.setAccessible(true);
return (DemandForwardingBridge) bridgeField.get(tc);
Expand Down
Loading
Loading