Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void transportResumed() {
} catch (JMSException e) {
}
}
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(15, TimeUnit.SECONDS));
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(30, TimeUnit.SECONDS));
// If we get another connection now it should be a new connection that
// works.
LOG.info("expect new connection after failure");
Expand Down
24 changes: 23 additions & 1 deletion activemq-unit-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,6 @@
<exclude>org/apache/activemq/transport/SoWriteTimeoutTest.*</exclude>
<exclude>org/apache/activemq/transport/TopicClusterTest.*</exclude>
<exclude>org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.*</exclude>
<exclude>org/apache/activemq/transport/discovery/DiscoveryTransportBrokerTest.*</exclude>
<exclude>org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.*</exclude>
<exclude>org/apache/activemq/transport/discovery/DiscoveryUriTest.*</exclude>
<exclude>org/apache/activemq/transport/discovery/MasterSlaveDiscoveryTest.*</exclude>
Expand Down Expand Up @@ -1170,6 +1169,29 @@
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests.mac.excludes</id>
<activation>
<os>
<family>mac</family>
</os>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<!-- Multicast and UDP based tests are unreliable on macOS CI (no proper multicast interfaces) -->
<exclude>org/apache/activemq/transport/peer/PeerTransportTest.*</exclude>
<exclude>org/apache/activemq/transport/multicast/MulticastTransportTest.*</exclude>
<exclude>org/apache/activemq/network/MulticastNetworkTest.*</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests.aix.excludes</id>
<activation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void stopBroker() throws Exception {
}
}

@Test
@Test(timeout = 120_000)
public void testDuplicateMessage() throws Exception {
LOG.info("Testing for duplicate messages.");

Expand All @@ -134,10 +134,10 @@ public void testDuplicateMessage() throws Exception {
createOpenwireClients(producers, consumers);

LOG.info("All producers and consumers got started. Awaiting their termination");
producersFinished.await(100, TimeUnit.MINUTES);
producersFinished.await(2, TimeUnit.MINUTES);
LOG.info("All producers have terminated. remaining to send: " + totalMessagesToSend.get() + ", sent:" + totalMessagesSent.get());

consumersFinished.await(100, TimeUnit.MINUTES);
consumersFinished.await(2, TimeUnit.MINUTES);
LOG.info("All consumers have terminated.");

producers.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
Expand Down Expand Up @@ -83,7 +82,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
private final FLOW flow;

@Rule
public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS);
public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS);

@Parameters
public static Collection<Object[]> data() {
Expand Down Expand Up @@ -531,7 +530,6 @@ public void testAddOnlineSubscriptionsWithBridgeOffline() throws Exception {
session1.createDurableSubscriber(topic, "sub3");
session1.createDurableSubscriber(excludeTopic, "sub-exclude");

Thread.sleep(1000);
assertNCDurableSubsCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, excludeTopic, 0);

Expand Down Expand Up @@ -570,13 +568,10 @@ public void testAddOnlineSubscriptionsTwoBridges() throws Exception {
secondConnector.start();

//Make sure both bridges are connected
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1;
}
}, 10000, 500));
assertTrue(Wait.waitFor(() ->
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1,
TimeUnit.SECONDS.toMillis(10), 500));

//Make sure NC durables exist for both bridges
assertNCDurableSubsCount(broker2, topic2, 1);
Expand Down Expand Up @@ -637,13 +632,7 @@ public void testVirtualDestSubForceDurableSync() throws Exception {
final DestinationStatistics remoteDestStatistics2 = remoteBroker.getDestination(
new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();

assertTrue(Wait.waitFor(new Condition() {

@Override
public boolean isSatisified() throws Exception {
return remoteDestStatistics2.getMessages().getCount() == 501;
}
}));
assertTrue(Wait.waitFor(() -> remoteDestStatistics2.getMessages().getCount() == 501));

}

Expand Down Expand Up @@ -723,8 +712,36 @@ protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector,
included = new ActiveMQTopic(testTopicName);
doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);
doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir);
//Give time for advisories to propagate
Thread.sleep(1000);
//Wait for the bridge to be fully started (advisory consumers registered).
//Note: activeBridges().size() == 1 is NOT sufficient because bridges are added
//to the map before start() completes asynchronously. We must wait for the
//startedLatch which counts down after advisory consumers are registered.
if (startNetworkConnector) {
waitForBridgeFullyStarted();
}
}

private void waitForBridgeFullyStarted() throws Exception {
// Wait for the local bridge to be fully started (advisory consumers registered)
assertTrue("Local bridge should be fully started", Wait.waitFor(() -> {
if (localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
return false;
}
final NetworkBridge bridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
if (bridge instanceof DemandForwardingBridgeSupport) {
return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0;
}
return true;
}, TimeUnit.SECONDS.toMillis(10), 100));

// Also wait for the duplex bridge on the remote broker to be fully started.
// The duplex connector creates a separate DemandForwardingBridge on the remote side
// that also needs its advisory consumers registered before it can process events.
assertTrue("Duplex bridge should be fully started", Wait.waitFor(() -> {
final DemandForwardingBridge duplexBridge = findDuplexBridge(
remoteBroker.getTransportConnectors().get(0));
return duplexBridge != null && duplexBridge.startedLatch.getCount() == 0;
}, TimeUnit.SECONDS.toMillis(10), 100));
}

protected void restartLocalBroker(boolean startNetworkConnector) throws Exception {
Expand Down Expand Up @@ -757,12 +774,12 @@ protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetwor
localConnection.start();

if (startNetworkConnector) {
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
}
}, 5000, 500);
// Best-effort wait for the bridge to appear. Do NOT use assertTrue here
// because some tests restart localBroker before remoteBroker is running,
// relying on the bridge connecting later when remoteBroker restarts.
// Tests that need the bridge to be fully started call assertBridgeStarted() explicitly.
Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
TimeUnit.SECONDS.toMillis(10), 500);
}
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Expand Down Expand Up @@ -881,19 +898,16 @@ protected BrokerService createRemoteBroker(File dataDir, int port) throws Except
protected void waitForSubscriptionInactive(final BrokerService brokerService,
final ActiveMQTopic topic,
final String subName) throws Exception {
assertTrue("Subscription should become inactive", Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
List<org.apache.activemq.broker.region.DurableTopicSubscription> subs = getSubscriptions(brokerService, topic);
for (org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) {
if (sub.getSubscriptionKey().getSubscriptionName().equals(subName)) {
return !sub.isActive();
}
assertTrue("Subscription should become inactive", Wait.waitFor(() -> {
final List<org.apache.activemq.broker.region.DurableTopicSubscription> subs = getSubscriptions(brokerService, topic);
for (final org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) {
if (sub.getSubscriptionKey().getSubscriptionName().equals(subName)) {
return !sub.isActive();
}
// If subscription doesn't exist, it's considered inactive
return true;
}
}, 5000, 100));
// If subscription doesn't exist, it's considered inactive
return true;
}, TimeUnit.SECONDS.toMillis(10), 100));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
Expand All @@ -45,7 +46,6 @@
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

Expand Down Expand Up @@ -95,9 +95,16 @@ protected void stopRemoteBroker() throws Exception {
}

protected void assertBridgeStarted() throws Exception {
assertTrue(Wait.waitFor(
() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
10000, 500));
assertTrue("Bridge should be fully started", Wait.waitFor(() -> {
if (localBroker.getNetworkConnectors().get(0).activeBridges().size() != 1) {
return false;
}
final NetworkBridge bridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
if (bridge instanceof DemandForwardingBridgeSupport) {
return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0;
}
return true;
}, TimeUnit.SECONDS.toMillis(10), 500));
}

protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionContext context,
Expand Down Expand Up @@ -135,17 +142,19 @@ protected interface ConsumerCreator {

protected void assertNCDurableSubsCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception {
assertTrue(Wait.waitFor(() -> count == getNCDurableSubs(brokerService, dest).size(),
10000, 500));
assertTrue("Expected " + count + " NC durable subs on " + dest,
Wait.waitFor(() -> count == getNCDurableSubs(brokerService, dest).size(),
TimeUnit.SECONDS.toMillis(30), 500));
}

protected void assertConsumersCount(final BrokerService brokerService,
final ActiveMQDestination dest, final int count) throws Exception {
assertTrue(Wait.waitFor(() -> count == getConsumers(brokerService, dest).size(),
10000, 500));
Thread.sleep(1000);
// Check one more time after a short pause to make sure the count didn't increase past what we wanted
assertEquals(count, getConsumers(brokerService, dest).size());
// Wait a bit longer and verify the count is stable (didn't increase past what we wanted)
assertTrue("Consumer count should remain stable at " + count,
Wait.waitFor(() -> count == getConsumers(brokerService, dest).size(),
TimeUnit.SECONDS.toMillis(5), 500));
}

protected List<Subscription> getConsumers(final BrokerService brokerService,
Expand Down Expand Up @@ -208,12 +217,9 @@ protected void removeSubscription(final BrokerService brokerService,

protected void assertSubscriptionsCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return count == getSubscriptions(brokerService, dest).size();
}
}, 10000, 500));
assertTrue("Expected " + count + " subscriptions on " + dest,
Wait.waitFor(() -> count == getSubscriptions(brokerService, dest).size(),
TimeUnit.SECONDS.toMillis(30), 500));
}

protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, final int count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,19 @@ public void testTempQueues() throws Exception {
assertEquals("Destination not deleted", 0, remoteBroker.getAdminView().getTemporaryQueues().length);
}

@Test
@Test(timeout = 60 * 1000)
public void testDynamicallyIncludedDestinationsForDuplex() throws Exception{
// Once the bridge is set up, we should see the filter used for the duplex end of the bridge
// only subscribe to the specific destinations included in the <dynamicallyIncludedDestinations> list
// so let's test that the filter is correct, let's also test the subscription on the localbroker
// is correct

// the bridge on the remote broker has the correct filter
TransportConnection bridgeConnection = getDuplexBridgeConnectionFromRemote();
final TransportConnection bridgeConnection = getDuplexBridgeConnectionFromRemote();
assertNotNull(bridgeConnection);
DemandForwardingBridge duplexBridge = getDuplexBridgeFromConnection(bridgeConnection);
final DemandForwardingBridge duplexBridge = waitForDuplexBridge(bridgeConnection);
assertNotNull(duplexBridge);
NetworkBridgeConfiguration configuration = getConfigurationFromNetworkBridge(duplexBridge);
final NetworkBridgeConfiguration configuration = getConfigurationFromNetworkBridge(duplexBridge);
assertNotNull(configuration);
assertFalse("This destinationFilter does not include ONLY the destinations specified in dynamicallyIncludedDestinations",
configuration.getDestinationFilter().equals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">"));
Expand All @@ -110,17 +110,20 @@ private DemandForwardingBridge getDuplexBridgeFromConnection(TransportConnection
return bridge;
}

private DemandForwardingBridge waitForDuplexBridge(final TransportConnection bridgeConnection) throws Exception {
assertTrue("Timed out waiting for duplex bridge to be fully started",
Wait.waitFor(() -> {
final DemandForwardingBridge bridge = getDuplexBridgeFromConnection(bridgeConnection);
return bridge != null && bridge.getRemoteBrokerName() != null;
}));
return getDuplexBridgeFromConnection(bridgeConnection);
}

public TransportConnection getDuplexBridgeConnectionFromRemote() throws Exception {
final TransportConnector transportConnector = remoteBroker.getTransportConnectorByScheme("tcp");
assertTrue("Timed out waiting for duplex bridge connection",
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() {
return !transportConnector.getConnections().isEmpty();
}
}));
CopyOnWriteArrayList<TransportConnection> transportConnections = transportConnector.getConnections();
return transportConnections.get(0);
Wait.waitFor(() -> !transportConnector.getConnections().isEmpty()));
return transportConnector.getConnections().get(0);
}

@Override
Expand Down
Loading
Loading