001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.command;
019
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.ObjectOutputStream;
025import java.io.OutputStream;
026import java.io.Serializable;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.List;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.zip.DeflaterOutputStream;
032import java.util.zip.InflaterInputStream;
033
034import javax.jms.JMSException;
035import javax.jms.ObjectMessage;
036
037import org.apache.activemq.ActiveMQConnection;
038import org.apache.activemq.util.ByteArrayInputStream;
039import org.apache.activemq.util.ByteArrayOutputStream;
040import org.apache.activemq.util.ByteSequence;
041import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
042import org.apache.activemq.util.JMSExceptionSupport;
043import org.apache.activemq.wireformat.WireFormat;
044
045/**
046 * An <CODE>ObjectMessage</CODE> object is used to send a message that
047 * contains a serializable object in the Java programming language ("Java
048 * object"). It inherits from the <CODE>Message</CODE> interface and adds a
049 * body containing a single reference to an object. Only
050 * <CODE>Serializable</CODE> Java objects can be used. <p/>
051 * <P>
052 * If a collection of Java objects must be sent, one of the
053 * <CODE>Collection</CODE> classes provided since JDK 1.2 can be used. <p/>
054 * <P>
055 * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only
056 * mode. If a client attempts to write to the message at this point, a
057 * <CODE>MessageNotWriteableException</CODE> is thrown. If
058 * <CODE>clearBody</CODE> is called, the message can now be both read from and
059 * written to.
060 *
061 * @openwire:marshaller code="26"
062 * @see javax.jms.Session#createObjectMessage()
063 * @see javax.jms.Session#createObjectMessage(Serializable)
064 * @see javax.jms.BytesMessage
065 * @see javax.jms.MapMessage
066 * @see javax.jms.Message
067 * @see javax.jms.StreamMessage
068 * @see javax.jms.TextMessage
069 */
070public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage, TransientInitializer {
071
072    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
073
074    private transient List<String> trustedPackages = Arrays.asList(ClassLoadingAwareObjectInputStream.serializablePackages);
075    private transient boolean trustAllPackages = false;
076
077    protected transient Serializable object;
078
079    @Override
080    public Message copy() {
081        ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
082        copy(copy);
083        copy.setTrustAllPackages(trustAllPackages);
084        copy.setTrustedPackages(trustedPackages);
085        return copy;
086    }
087
088    private void copy(ActiveMQObjectMessage copy) {
089        ActiveMQConnection connection = getConnection();
090        if (connection == null || !connection.isObjectMessageSerializationDefered()) {
091            storeContent();
092            copy.object = null;
093        } else {
094            copy.object = object;
095        }
096        super.copy(copy);
097
098    }
099
100    @Override
101    public void storeContentAndClear() {
102        storeContent();
103        object = null;
104    }
105
106    @Override
107    public void storeContent() {
108        ByteSequence bodyAsBytes = getContent();
109        if (bodyAsBytes == null && object != null) {
110            try {
111                ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
112                OutputStream os = bytesOut;
113                ActiveMQConnection connection = getConnection();
114                if (connection != null && connection.isUseCompression()) {
115                    compressed = true;
116                    os = new DeflaterOutputStream(os);
117                }
118                DataOutputStream dataOut = new DataOutputStream(os);
119                ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
120                objOut.writeObject(object);
121                objOut.flush();
122                objOut.reset();
123                objOut.close();
124                setContent(bytesOut.toByteSequence());
125            } catch (IOException ioe) {
126                throw new RuntimeException(ioe.getMessage(), ioe);
127            }
128        }
129    }
130
131    @Override
132    public boolean isContentMarshalled() {
133        return content != null || object == null;
134    }
135
136    @Override
137    public byte getDataStructureType() {
138        return DATA_STRUCTURE_TYPE;
139    }
140
141    @Override
142    public String getJMSXMimeType() {
143        return "jms/object-message";
144    }
145
146    /**
147     * Clears out the message body. Clearing a message's body does not clear its
148     * header values or property entries. <p/>
149     * <P>
150     * If this message body was read-only, calling this method leaves the
151     * message body in the same state as an empty body in a newly created
152     * message.
153     *
154     * @throws JMSException if the JMS provider fails to clear the message body
155     *                 due to some internal error.
156     */
157
158    @Override
159    public void clearBody() throws JMSException {
160        super.clearBody();
161        this.object = null;
162    }
163
164    /**
165     * Sets the serializable object containing this message's data. It is
166     * important to note that an <CODE>ObjectMessage</CODE> contains a
167     * snapshot of the object at the time <CODE>setObject()</CODE> is called;
168     * subsequent modifications of the object will have no effect on the
169     * <CODE>ObjectMessage</CODE> body.
170     *
171     * @param newObject the message's data
172     * @throws JMSException if the JMS provider fails to set the object due to
173     *                 some internal error.
174     * @throws javax.jms.MessageFormatException if object serialization fails.
175     * @throws javax.jms.MessageNotWriteableException if the message is in
176     *                 read-only mode.
177     */
178
179    @Override
180    public void setObject(Serializable newObject) throws JMSException {
181        checkReadOnlyBody();
182        this.object = newObject;
183        setContent(null);
184        ActiveMQConnection connection = getConnection();
185        if (connection == null || !connection.isObjectMessageSerializationDefered()) {
186            storeContent();
187        }
188    }
189
190    /**
191     * Gets the serializable object containing this message's data. The default
192     * value is null.
193     *
194     * @return the serializable object containing this message's data
195     * @throws JMSException
196     */
197    @Override
198    public Serializable getObject() throws JMSException {
199        if (object == null && getContent() != null) {
200            try {
201                ByteSequence content = getContent();
202                InputStream is = new ByteArrayInputStream(content);
203                if (isCompressed()) {
204                    is = new InflaterInputStream(is);
205                }
206                DataInputStream dataIn = new DataInputStream(is);
207                ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
208                objIn.setTrustedPackages(trustedPackages);
209                objIn.setTrustAllPackages(trustAllPackages);
210                try {
211                    object = (Serializable)objIn.readObject();
212                } catch (ClassNotFoundException ce) {
213                    throw JMSExceptionSupport.create("Failed to build body from content. Serializable class not available to broker. Reason: " + ce, ce);
214                } finally {
215                    dataIn.close();
216                }
217            } catch (IOException e) {
218                throw JMSExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e);
219            }
220        }
221        return this.object;
222    }
223
224    @Override
225    public void beforeMarshall(WireFormat wireFormat) throws IOException {
226        super.beforeMarshall(wireFormat);
227        // may have initiated on vm transport with deferred marshalling
228        storeContent();
229    }
230
231    @Override
232    public void clearUnMarshalledState() throws JMSException {
233        super.clearUnMarshalledState();
234        this.object = null;
235    }
236
237    @Override
238    public void onMessageRolledBack() {
239        super.onMessageRolledBack();
240
241        // lets force the object to be deserialized again - as we could have
242        // changed the object
243        object = null;
244    }
245
246    @Override
247    public void compress() throws IOException {
248        storeContent();
249        super.compress();
250    }
251
252    @Override
253    public String toString() {
254        try {
255            getObject();
256        } catch (JMSException e) {
257        }
258        return super.toString();
259    }
260
261    public List<String> getTrustedPackages() {
262        return trustedPackages;
263    }
264
265    public void setTrustedPackages(List<String> trustedPackages) {
266        this.trustedPackages = trustedPackages;
267    }
268
269    public boolean isTrustAllPackages() {
270        return trustAllPackages;
271    }
272
273    public void setTrustAllPackages(boolean trustAllPackages) {
274        this.trustAllPackages = trustAllPackages;
275    }
276
277    @Override
278    public void initTransients() {
279        trustedPackages = Arrays.asList(ClassLoadingAwareObjectInputStream.serializablePackages);
280        trustAllPackages = false;
281    }
282}