001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.UnsupportedEncodingException; 020 021import org.fusesource.mqtt.codec.CONNECT; 022import org.fusesource.mqtt.codec.DISCONNECT; 023import org.fusesource.mqtt.codec.PINGREQ; 024import org.fusesource.mqtt.codec.PUBACK; 025import org.fusesource.mqtt.codec.PUBCOMP; 026import org.fusesource.mqtt.codec.PUBLISH; 027import org.fusesource.mqtt.codec.PUBREC; 028import org.fusesource.mqtt.codec.PUBREL; 029import org.fusesource.mqtt.codec.SUBSCRIBE; 030import org.fusesource.mqtt.codec.UNSUBSCRIBE; 031 032/** 033 * A set of static methods useful for handling MQTT based client connections. 034 */ 035public class MQTTProtocolSupport { 036 037 private static final int TOPIC_NAME_MIN_LENGTH = 1; 038 private static final int TOPIC_NAME_MAX_LENGTH = 65535; 039 040 private static final String MULTI_LEVEL_WILDCARD = "#"; 041 private static final String SINGLE_LEVEL_WILDCARD = "+"; 042 043 private static final char MULTI_LEVEL_WILDCARD_CHAR = '#'; 044 private static final char SINGLE_LEVEL_WILDCARD_CHAR = '+'; 045 private static final char TOPIC_LEVEL_SEPERATOR_CHAR = '/'; 046 047 /** 048 * Converts an MQTT formatted Topic name into a suitable ActiveMQ Destination 049 * name string. 050 * 051 * @param name 052 * the MQTT formatted topic name. 053 * 054 * @return an destination name that fits the ActiveMQ conventions. 055 */ 056 public static String convertMQTTToActiveMQ(String name) { 057 char[] chars = name.toCharArray(); 058 for (int i = 0; i < chars.length; i++) { 059 switch(chars[i]) { 060 case '#': 061 chars[i] = '>'; 062 break; 063 case '>': 064 chars[i] = '#'; 065 break; 066 case '+': 067 chars[i] = '*'; 068 break; 069 case '*': 070 chars[i] = '+'; 071 break; 072 case '/': 073 chars[i] = '.'; 074 break; 075 case '.': 076 chars[i] = '/'; 077 break; 078 } 079 } 080 String rc = new String(chars); 081 return rc; 082 } 083 084 /** 085 * Converts an ActiveMQ destination name into a correctly formatted 086 * MQTT destination name. 087 * 088 * @param destinationName 089 * the ActiveMQ destination name to process. 090 * 091 * @return a destination name formatted for MQTT. 092 */ 093 public static String convertActiveMQToMQTT(String destinationName) { 094 char[] chars = destinationName.toCharArray(); 095 for (int i = 0; i < chars.length; i++) { 096 switch(chars[i]) { 097 case '>': 098 chars[i] = '#'; 099 break; 100 case '#': 101 chars[i] = '>'; 102 break; 103 case '*': 104 chars[i] = '+'; 105 break; 106 case '+': 107 chars[i] = '*'; 108 break; 109 case '.': 110 chars[i] = '/'; 111 break; 112 case '/': 113 chars[i] = '.'; 114 break; 115 } 116 } 117 String rc = new String(chars); 118 return rc; 119 } 120 121 /** 122 * Given an MQTT header byte, determine the command type that the header 123 * represents. 124 * 125 * @param header 126 * the byte value for the MQTT frame header. 127 * 128 * @return a string value for the given command type. 129 */ 130 public static String commandType(byte header) { 131 byte messageType = (byte) ((header & 0xF0) >>> 4); 132 switch (messageType) { 133 case PINGREQ.TYPE: 134 return "PINGREQ"; 135 case CONNECT.TYPE: 136 return "CONNECT"; 137 case DISCONNECT.TYPE: 138 return "DISCONNECT"; 139 case SUBSCRIBE.TYPE: 140 return "SUBSCRIBE"; 141 case UNSUBSCRIBE.TYPE: 142 return "UNSUBSCRIBE"; 143 case PUBLISH.TYPE: 144 return "PUBLISH"; 145 case PUBACK.TYPE: 146 return "PUBACK"; 147 case PUBREC.TYPE: 148 return "PUBREC"; 149 case PUBREL.TYPE: 150 return "PUBREL"; 151 case PUBCOMP.TYPE: 152 return "PUBCOMP"; 153 default: 154 return "UNKNOWN"; 155 } 156 } 157 158 /** 159 * Validate that the Topic names given by client commands are valid 160 * based on the MQTT protocol specification. 161 * 162 * @param topicName 163 * the given Topic name provided by the client. 164 * 165 * @throws MQTTProtocolException if the value given is invalid. 166 */ 167 public static void validate(String topicName) throws MQTTProtocolException { 168 int topicLen = 0; 169 try { 170 topicLen = topicName.getBytes("UTF-8").length; 171 } catch (UnsupportedEncodingException e) { 172 throw new MQTTProtocolException("Topic name contained invalid UTF-8 encoding."); 173 } 174 175 // Spec: Unless stated otherwise all UTF-8 encoded strings can have any length in 176 // the range 0 to 65535 bytes. 177 if (topicLen < TOPIC_NAME_MIN_LENGTH || topicLen > TOPIC_NAME_MAX_LENGTH) { 178 throw new MQTTProtocolException("Topic name given had invliad length."); 179 } 180 181 // 4.7.1.2 and 4.7.1.3 these can stand alone 182 if (MULTI_LEVEL_WILDCARD.equals(topicName) || SINGLE_LEVEL_WILDCARD.equals(topicName)) { 183 return; 184 } 185 186 // Spec: 4.7.1.2 187 // The multi-level wildcard character MUST be specified either on its own or following a 188 // topic level separator. In either case it MUST be the last character specified in the 189 // Topic Filter [MQTT-4.7.1-2]. 190 int numWildCards = 0; 191 for (int i = 0; i < topicName.length(); ++i) { 192 if (topicName.charAt(i) == MULTI_LEVEL_WILDCARD_CHAR) { 193 numWildCards++; 194 195 // If prev exists it must be a separator 196 if (i > 0 && topicName.charAt(i - 1) != TOPIC_LEVEL_SEPERATOR_CHAR) { 197 throw new MQTTProtocolException("The multi level wildcard must stand alone: " + topicName); 198 } 199 } 200 201 if (numWildCards > 1) { 202 throw new MQTTProtocolException("Topic Filter can only have one multi-level filter: " + topicName); 203 } 204 } 205 206 if (topicName.contains(MULTI_LEVEL_WILDCARD) && !topicName.endsWith(MULTI_LEVEL_WILDCARD)) { 207 throw new MQTTProtocolException("The multi-level filter must be at the end of the Topic name: " + topicName); 208 } 209 210 // Spec: 4.7.1.3 211 // The single-level wildcard can be used at any level in the Topic Filter, including 212 // first and last levels. Where it is used it MUST occupy an entire level of the filter 213 // 214 // [MQTT-4.7.1-3]. It can be used at more than one level in the Topic Filter and can be 215 // used in conjunction with the multilevel wildcard. 216 for (int i = 0; i < topicName.length(); ++i) { 217 if (topicName.charAt(i) != SINGLE_LEVEL_WILDCARD_CHAR) { 218 continue; 219 } 220 221 // If prev exists it must be a separator 222 if (i > 0 && topicName.charAt(i - 1) != TOPIC_LEVEL_SEPERATOR_CHAR) { 223 throw new MQTTProtocolException("The single level wildcard must stand alone: " + topicName); 224 } 225 226 // If next exists it must be a separator 227 if (i < topicName.length() - 1 && topicName.charAt(i + 1) != TOPIC_LEVEL_SEPERATOR_CHAR) { 228 throw new MQTTProtocolException("The single level wildcard must stand alone: " + topicName); 229 } 230 } 231 } 232}