diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 743e8260bf..b067f4a382 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -55,6 +55,7 @@ import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.JMXSupport; import org.apache.activemq.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; @@ -1672,10 +1673,9 @@ public void testReceiveMessageSentWhileOffline() throws Exception { connectionSub.disconnect(); // Wait for broker to process disconnect before publishing messages for offline delivery. - // Check for no active durable subscribers (works for both regular and virtual topic strategies) assertTrue("Subscription should become inactive", - Wait.waitFor(() -> brokerService.getAdminView().getDurableTopicSubscribers().length == 0, - 5000, 100)); + Wait.waitFor(() -> isSubscriptionInactive(topics[0], mqttSub.getClientId().toString()), + TimeUnit.SECONDS.toMillis(5), 100)); try { for (int j = 0; j < numberOfRuns; j++) { @@ -1699,8 +1699,8 @@ public void testReceiveMessageSentWhileOffline() throws Exception { // Wait for broker to process disconnect before next iteration publishes assertTrue("Subscription should become inactive", - Wait.waitFor(() -> brokerService.getAdminView().getDurableTopicSubscribers().length == 0, - 5000, 100)); + Wait.waitFor(() -> isSubscriptionInactive(topics[0], mqttSub.getClientId().toString()), + TimeUnit.SECONDS.toMillis(5), 100)); } } catch (Exception exception) { LOG.error("unexpected exception", exception); @@ -1709,6 +1709,32 @@ public void testReceiveMessageSentWhileOffline() throws Exception { assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received); } + private boolean isSubscriptionInactive(Topic topic, String clientId) throws Exception { + if (isVirtualTopicSubscriptionStrategy()) { + String queueName = buildVirtualTopicQueueName(topic, clientId); + try { + return getProxyToQueue(queueName).getConsumerCount() == 0; + } catch (Exception ignore) { + return false; + } + } else { + return brokerService.getAdminView().getDurableTopicSubscribers().length == 0 && + brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1; + } + } + + private boolean isVirtualTopicSubscriptionStrategy() { + String config = getProtocolConfig(); + return config != null && config.contains("mqtt-virtual-topic-subscriptions"); + } + + private String buildVirtualTopicQueueName(Topic topic, String clientId) { + String activeMqClientId = MQTTProtocolSupport.convertMQTTToActiveMQ(clientId); + String activeMqTopic = MQTTProtocolSupport.convertMQTTToActiveMQ(topic.name().toString()); + String queueName = "Consumer." + activeMqClientId + ":" + topic.qos() + ".VirtualTopic." + activeMqTopic; + return JMXSupport.encodeObjectNamePart(queueName); + } + @Test(timeout = 60 * 1000) public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exception { stopBroker(); @@ -1734,9 +1760,8 @@ public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exceptio } // Wait for broker to process disconnect before publishing messages for offline delivery. - // Check for no active durable subscribers (works for both regular and virtual topic strategies) assertTrue("Subscription should become inactive", - Wait.waitFor(() -> brokerService.getAdminView().getDurableTopicSubscribers().length == 0, + Wait.waitFor(() -> isSubscriptionInactive(topics[0], "MQTT-Sub-Client"), 5000, 100)); MQTT mqttPubLoop = createMQTTConnection("MQTT-Pub-Client", true);