Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
Expand Down Expand Up @@ -1672,10 +1673,9 @@ public void testReceiveMessageSentWhileOffline() throws Exception {
connectionSub.disconnect();

// Wait for broker to process disconnect before publishing messages for offline delivery.
// Check for no active durable subscribers (works for both regular and virtual topic strategies)
assertTrue("Subscription should become inactive",
Wait.waitFor(() -> brokerService.getAdminView().getDurableTopicSubscribers().length == 0,
5000, 100));
Wait.waitFor(() -> isSubscriptionInactive(topics[0], mqttSub.getClientId().toString()),
TimeUnit.SECONDS.toMillis(5), 100));

try {
for (int j = 0; j < numberOfRuns; j++) {
Expand All @@ -1699,8 +1699,8 @@ public void testReceiveMessageSentWhileOffline() throws Exception {

// Wait for broker to process disconnect before next iteration publishes
assertTrue("Subscription should become inactive",
Wait.waitFor(() -> brokerService.getAdminView().getDurableTopicSubscribers().length == 0,
5000, 100));
Wait.waitFor(() -> isSubscriptionInactive(topics[0], mqttSub.getClientId().toString()),
TimeUnit.SECONDS.toMillis(5), 100));
}
} catch (Exception exception) {
LOG.error("unexpected exception", exception);
Expand All @@ -1709,6 +1709,32 @@ public void testReceiveMessageSentWhileOffline() throws Exception {
assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
}

private boolean isSubscriptionInactive(Topic topic, String clientId) throws Exception {
if (isVirtualTopicSubscriptionStrategy()) {
String queueName = buildVirtualTopicQueueName(topic, clientId);
try {
return getProxyToQueue(queueName).getConsumerCount() == 0;
} catch (Exception ignore) {
return false;
}
} else {
return brokerService.getAdminView().getDurableTopicSubscribers().length == 0 &&
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
}
}

private boolean isVirtualTopicSubscriptionStrategy() {
String config = getProtocolConfig();
return config != null && config.contains("mqtt-virtual-topic-subscriptions");
}

private String buildVirtualTopicQueueName(Topic topic, String clientId) {
String activeMqClientId = MQTTProtocolSupport.convertMQTTToActiveMQ(clientId);
String activeMqTopic = MQTTProtocolSupport.convertMQTTToActiveMQ(topic.name().toString());
String queueName = "Consumer." + activeMqClientId + ":" + topic.qos() + ".VirtualTopic." + activeMqTopic;
return JMXSupport.encodeObjectNamePart(queueName);
}

@Test(timeout = 60 * 1000)
public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exception {
stopBroker();
Expand All @@ -1734,9 +1760,8 @@ public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exceptio
}

// Wait for broker to process disconnect before publishing messages for offline delivery.
// Check for no active durable subscribers (works for both regular and virtual topic strategies)
assertTrue("Subscription should become inactive",
Wait.waitFor(() -> brokerService.getAdminView().getDurableTopicSubscribers().length == 0,
Wait.waitFor(() -> isSubscriptionInactive(topics[0], "MQTT-Sub-Client"),
5000, 100));

MQTT mqttPubLoop = createMQTTConnection("MQTT-Pub-Client", true);
Expand Down