001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.net.MalformedURLException; 020import java.util.Enumeration; 021 022import javax.jms.BytesMessage; 023import javax.jms.Destination; 024import javax.jms.JMSException; 025import javax.jms.MapMessage; 026import javax.jms.Message; 027import javax.jms.MessageEOFException; 028import javax.jms.ObjectMessage; 029import javax.jms.StreamMessage; 030import javax.jms.TextMessage; 031 032import org.apache.activemq.blob.BlobDownloader; 033import org.apache.activemq.command.ActiveMQBlobMessage; 034import org.apache.activemq.command.ActiveMQBytesMessage; 035import org.apache.activemq.command.ActiveMQDestination; 036import org.apache.activemq.command.ActiveMQMapMessage; 037import org.apache.activemq.command.ActiveMQMessage; 038import org.apache.activemq.command.ActiveMQObjectMessage; 039import org.apache.activemq.command.ActiveMQStreamMessage; 040import org.apache.activemq.command.ActiveMQTextMessage; 041 042/** 043 * A helper class for converting normal JMS interfaces into ActiveMQ specific 044 * ones. 045 * 046 * 047 */ 048public final class ActiveMQMessageTransformation { 049 050 private ActiveMQMessageTransformation() { 051 } 052 053 /** 054 * Creates a an available JMS message from another provider. 055 * 056 * @param destination - Destination to be converted into ActiveMQ's 057 * implementation. 058 * @return ActiveMQDestination - ActiveMQ's implementation of the 059 * destination. 060 * @throws JMSException if an error occurs 061 */ 062 public static ActiveMQDestination transformDestination(Destination destination) throws JMSException { 063 return ActiveMQDestination.transform(destination); 064 } 065 066 /** 067 * Creates a fast shallow copy of the current ActiveMQMessage or creates a 068 * whole new message instance from an available JMS message from another 069 * provider. 070 * 071 * @param message - Message to be converted into ActiveMQ's implementation. 072 * @param connection 073 * @return ActiveMQMessage - ActiveMQ's implementation object of the 074 * message. 075 * @throws JMSException if an error occurs 076 */ 077 public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection) 078 throws JMSException { 079 if (message instanceof ActiveMQMessage) { 080 return (ActiveMQMessage)message; 081 082 } else { 083 ActiveMQMessage activeMessage = null; 084 085 if (message instanceof BytesMessage) { 086 BytesMessage bytesMsg = (BytesMessage)message; 087 bytesMsg.reset(); 088 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 089 msg.setConnection(connection); 090 try { 091 for (;;) { 092 // Reads a byte from the message stream until the stream 093 // is empty 094 msg.writeByte(bytesMsg.readByte()); 095 } 096 } catch (MessageEOFException e) { 097 // if an end of message stream as expected 098 } catch (JMSException e) { 099 } 100 101 activeMessage = msg; 102 } else if (message instanceof MapMessage) { 103 MapMessage mapMsg = (MapMessage)message; 104 ActiveMQMapMessage msg = new ActiveMQMapMessage(); 105 msg.setConnection(connection); 106 Enumeration iter = mapMsg.getMapNames(); 107 108 while (iter.hasMoreElements()) { 109 String name = iter.nextElement().toString(); 110 msg.setObject(name, mapMsg.getObject(name)); 111 } 112 113 activeMessage = msg; 114 } else if (message instanceof ObjectMessage) { 115 ObjectMessage objMsg = (ObjectMessage)message; 116 ActiveMQObjectMessage msg = new ActiveMQObjectMessage(); 117 msg.setConnection(connection); 118 msg.setObject(objMsg.getObject()); 119 msg.storeContent(); 120 activeMessage = msg; 121 } else if (message instanceof StreamMessage) { 122 StreamMessage streamMessage = (StreamMessage)message; 123 streamMessage.reset(); 124 ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); 125 msg.setConnection(connection); 126 Object obj = null; 127 128 try { 129 while ((obj = streamMessage.readObject()) != null) { 130 msg.writeObject(obj); 131 } 132 } catch (MessageEOFException e) { 133 // if an end of message stream as expected 134 } catch (JMSException e) { 135 } 136 137 activeMessage = msg; 138 } else if (message instanceof TextMessage) { 139 TextMessage textMsg = (TextMessage)message; 140 ActiveMQTextMessage msg = new ActiveMQTextMessage(); 141 msg.setConnection(connection); 142 msg.setText(textMsg.getText()); 143 activeMessage = msg; 144 } else if (message instanceof BlobMessage) { 145 BlobMessage blobMessage = (BlobMessage)message; 146 ActiveMQBlobMessage msg = new ActiveMQBlobMessage(); 147 msg.setConnection(connection); 148 if (connection != null){ 149 msg.setBlobDownloader(new BlobDownloader(connection.getBlobTransferPolicy())); 150 } 151 try { 152 msg.setURL(blobMessage.getURL()); 153 } catch (MalformedURLException e) { 154 155 } 156 activeMessage = msg; 157 } else { 158 activeMessage = new ActiveMQMessage(); 159 activeMessage.setConnection(connection); 160 } 161 162 copyProperties(message, activeMessage); 163 164 return activeMessage; 165 } 166 } 167 168 /** 169 * Copies the standard JMS and user defined properties from the givem 170 * message to the specified message 171 * 172 * @param fromMessage the message to take the properties from 173 * @param toMessage the message to add the properties to 174 * @throws JMSException 175 */ 176 public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException { 177 toMessage.setJMSMessageID(fromMessage.getJMSMessageID()); 178 toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID()); 179 toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo())); 180 toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination())); 181 toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode()); 182 toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered()); 183 toMessage.setJMSType(fromMessage.getJMSType()); 184 toMessage.setJMSExpiration(fromMessage.getJMSExpiration()); 185 toMessage.setJMSPriority(fromMessage.getJMSPriority()); 186 toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp()); 187 188 Enumeration propertyNames = fromMessage.getPropertyNames(); 189 190 while (propertyNames.hasMoreElements()) { 191 String name = propertyNames.nextElement().toString(); 192 Object obj = fromMessage.getObjectProperty(name); 193 toMessage.setObjectProperty(name, obj); 194 } 195 } 196}