001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt.strategy; 018 019import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertActiveMQToMQTT; 020import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertMQTTToActiveMQ; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Set; 028import java.util.StringTokenizer; 029 030import org.apache.activemq.ActiveMQPrefetchPolicy; 031import org.apache.activemq.broker.region.QueueRegion; 032import org.apache.activemq.broker.region.RegionBroker; 033import org.apache.activemq.command.ActiveMQDestination; 034import org.apache.activemq.command.ActiveMQQueue; 035import org.apache.activemq.command.ActiveMQTopic; 036import org.apache.activemq.command.ConsumerInfo; 037import org.apache.activemq.command.DestinationInfo; 038import org.apache.activemq.command.RemoveSubscriptionInfo; 039import org.apache.activemq.command.Response; 040import org.apache.activemq.command.SubscriptionInfo; 041import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; 042import org.apache.activemq.transport.mqtt.MQTTProtocolException; 043import org.apache.activemq.transport.mqtt.MQTTProtocolSupport; 044import org.apache.activemq.transport.mqtt.MQTTSubscription; 045import org.apache.activemq.transport.mqtt.ResponseHandler; 046import org.fusesource.mqtt.client.QoS; 047import org.fusesource.mqtt.codec.CONNECT; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * Subscription strategy that converts all MQTT subscribes that would be durable to 053 * Virtual Topic Queue subscriptions. Also maps all publish requests to be prefixed 054 * with the VirtualTopic. prefix unless already present. 055 */ 056public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy { 057 058 private static final String VIRTUALTOPIC_PREFIX = "VirtualTopic."; 059 private static final String VIRTUALTOPIC_CONSUMER_PREFIX = "Consumer."; 060 061 private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionStrategy.class); 062 063 private final Set<ActiveMQQueue> restoredQueues = Collections.synchronizedSet(new HashSet<ActiveMQQueue>()); 064 065 @Override 066 public void onConnect(CONNECT connect) throws MQTTProtocolException { 067 List<ActiveMQQueue> queues = lookupQueues(protocol.getClientId()); 068 List<SubscriptionInfo> subs = lookupSubscription(protocol.getClientId()); 069 070 // When clean session is true we must purge all of the client's old Queue subscriptions 071 // and any durable subscriptions created on the VirtualTopic instance as well. 072 073 if (connect.cleanSession()) { 074 deleteDurableQueues(queues); 075 deleteDurableSubs(subs); 076 } else { 077 restoreDurableQueue(queues); 078 restoreDurableSubs(subs); 079 } 080 } 081 082 @Override 083 public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException { 084 ActiveMQDestination destination = null; 085 int prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; 086 ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); 087 String converted = convertMQTTToActiveMQ(topicName); 088 if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) { 089 090 if (converted.startsWith(VIRTUALTOPIC_PREFIX)) { 091 destination = new ActiveMQTopic(converted); 092 prefetch = ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; 093 consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName); 094 } else { 095 converted = VIRTUALTOPIC_CONSUMER_PREFIX + 096 convertMQTTToActiveMQ(protocol.getClientId()) + ":" + requestedQoS + "." + 097 VIRTUALTOPIC_PREFIX + converted; 098 destination = new ActiveMQQueue(converted); 099 prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; 100 } 101 } else { 102 if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) { 103 converted = VIRTUALTOPIC_PREFIX + converted; 104 } 105 destination = new ActiveMQTopic(converted); 106 prefetch = ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH; 107 } 108 109 consumerInfo.setDestination(destination); 110 if (protocol.getActiveMQSubscriptionPrefetch() > 0) { 111 consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); 112 } else { 113 consumerInfo.setPrefetchSize(prefetch); 114 } 115 consumerInfo.setRetroactive(true); 116 consumerInfo.setDispatchAsync(true); 117 118 return doSubscribe(consumerInfo, topicName, requestedQoS); 119 } 120 121 @Override 122 public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException { 123 124 ActiveMQDestination destination = mqttSubscription.getDestination(); 125 126 // check whether the Queue has been recovered in restoreDurableQueue 127 // mark subscription available for recovery for duplicate subscription 128 if (destination.isQueue() && restoredQueues.remove(destination)) { 129 return; 130 } 131 132 // check whether the Topic has been recovered in restoreDurableSubs 133 // mark subscription available for recovery for duplicate subscription 134 if (destination.isTopic() && restoredDurableSubs.remove(destination.getPhysicalName())) { 135 return; 136 } 137 138 if (mqttSubscription.getDestination().isTopic()) { 139 super.onReSubscribe(mqttSubscription); 140 } else { 141 doUnSubscribe(mqttSubscription); 142 ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo(); 143 consumerInfo.setConsumerId(getNextConsumerId()); 144 doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS()); 145 } 146 } 147 148 @Override 149 public void onUnSubscribe(String topicName) throws MQTTProtocolException { 150 MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName); 151 if (subscription != null) { 152 doUnSubscribe(subscription); 153 if (subscription.getDestination().isQueue()) { 154 DestinationInfo remove = new DestinationInfo(); 155 remove.setConnectionId(protocol.getConnectionId()); 156 remove.setDestination(subscription.getDestination()); 157 remove.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 158 159 protocol.sendToActiveMQ(remove, new ResponseHandler() { 160 @Override 161 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 162 // ignore failures.. 163 } 164 }); 165 } else if (subscription.getConsumerInfo().getSubscriptionName() != null) { 166 // also remove it from restored durable subscriptions set 167 restoredDurableSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName())); 168 169 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 170 rsi.setConnectionId(protocol.getConnectionId()); 171 rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName()); 172 rsi.setClientId(protocol.getClientId()); 173 protocol.sendToActiveMQ(rsi, new ResponseHandler() { 174 @Override 175 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 176 // ignore failures.. 177 } 178 }); 179 } 180 } 181 } 182 183 @Override 184 public ActiveMQDestination onSend(String topicName) { 185 ActiveMQTopic topic = new ActiveMQTopic(topicName); 186 if (topic.isComposite()) { 187 ActiveMQDestination[] composites = topic.getCompositeDestinations(); 188 for (ActiveMQDestination composite : composites) { 189 composite.setPhysicalName(prefix(composite.getPhysicalName())); 190 } 191 ActiveMQTopic result = new ActiveMQTopic(); 192 result.setCompositeDestinations(composites); 193 return result; 194 } else { 195 return new ActiveMQTopic(prefix(topicName)); 196 } 197 } 198 199 private String prefix(String topicName) { 200 if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) { 201 return VIRTUALTOPIC_PREFIX + topicName; 202 } else { 203 return topicName; 204 } 205 } 206 207 @Override 208 public String onSend(ActiveMQDestination destination) { 209 String destinationName = destination.getPhysicalName(); 210 int position = destinationName.indexOf(VIRTUALTOPIC_PREFIX); 211 if (position >= 0) { 212 destinationName = destinationName.substring(position + VIRTUALTOPIC_PREFIX.length()).substring(0); 213 } 214 return destinationName; 215 } 216 217 @Override 218 public boolean isControlTopic(ActiveMQDestination destination) { 219 String destinationName = destination.getPhysicalName(); 220 if (destinationName.startsWith("$") || destinationName.startsWith(VIRTUALTOPIC_PREFIX + "$")) { 221 return true; 222 } 223 return false; 224 } 225 226 private void deleteDurableQueues(List<ActiveMQQueue> queues) { 227 try { 228 for (ActiveMQQueue queue : queues) { 229 LOG.debug("Removing queue subscription for {} ",queue.getPhysicalName()); 230 DestinationInfo removeAction = new DestinationInfo(); 231 removeAction.setConnectionId(protocol.getConnectionId()); 232 removeAction.setDestination(queue); 233 removeAction.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 234 235 protocol.sendToActiveMQ(removeAction, new ResponseHandler() { 236 @Override 237 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 238 // ignore failures.. 239 } 240 }); 241 } 242 } catch (Throwable e) { 243 LOG.warn("Could not delete the MQTT queue subscriptions.", e); 244 } 245 } 246 247 private void restoreDurableQueue(List<ActiveMQQueue> queues) { 248 try { 249 for (ActiveMQQueue queue : queues) { 250 String name = queue.getPhysicalName().substring(VIRTUALTOPIC_CONSUMER_PREFIX.length()); 251 StringTokenizer tokenizer = new StringTokenizer(name); 252 tokenizer.nextToken(":."); 253 String qosString = tokenizer.nextToken(); 254 tokenizer.nextToken(); 255 String topicName = convertActiveMQToMQTT(tokenizer.nextToken("").substring(1)); 256 QoS qoS = QoS.valueOf(qosString); 257 LOG.trace("Restoring queue subscription: {}:{}", topicName, qoS); 258 259 ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); 260 consumerInfo.setDestination(queue); 261 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH); 262 if (protocol.getActiveMQSubscriptionPrefetch() > 0) { 263 consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); 264 } 265 consumerInfo.setRetroactive(true); 266 consumerInfo.setDispatchAsync(true); 267 268 doSubscribe(consumerInfo, topicName, qoS); 269 270 // mark this durable subscription as restored by Broker 271 restoredQueues.add(queue); 272 } 273 } catch (IOException e) { 274 LOG.warn("Could not restore the MQTT queue subscriptions.", e); 275 } 276 } 277 278 List<ActiveMQQueue> lookupQueues(String clientId) throws MQTTProtocolException { 279 List<ActiveMQQueue> result = new ArrayList<ActiveMQQueue>(); 280 RegionBroker regionBroker; 281 282 try { 283 regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); 284 } catch (Exception e) { 285 throw new MQTTProtocolException("Error recovering queues: " + e.getMessage(), false, e); 286 } 287 288 final QueueRegion queueRegion = (QueueRegion) regionBroker.getQueueRegion(); 289 for (ActiveMQDestination destination : queueRegion.getDestinationMap().keySet()) { 290 if (destination.isQueue() && !destination.isTemporary()) { 291 if (destination.getPhysicalName().startsWith("Consumer." + clientId + ":")) { 292 LOG.debug("Recovered client sub: {} on connect", destination.getPhysicalName()); 293 result.add((ActiveMQQueue) destination); 294 } 295 } 296 } 297 298 return result; 299 } 300}