001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.EOFException;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.util.Arrays;
026import java.util.zip.Deflater;
027import java.util.zip.Inflater;
028
029import javax.jms.BytesMessage;
030import javax.jms.JMSException;
031import javax.jms.MessageEOFException;
032import javax.jms.MessageFormatException;
033import javax.jms.MessageNotReadableException;
034import javax.jms.MessageNotWriteableException;
035
036import org.apache.activemq.ActiveMQConnection;
037import org.apache.activemq.util.ByteArrayInputStream;
038import org.apache.activemq.util.ByteArrayOutputStream;
039import org.apache.activemq.util.ByteSequence;
040import org.apache.activemq.util.ByteSequenceData;
041import org.apache.activemq.util.JMSExceptionSupport;
042
043/**
044 * A <CODE>BytesMessage</CODE> object is used to send a message containing a
045 * stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE>
046 * interface and adds a bytes message body. The receiver of the message supplies
047 * the interpretation of the bytes.
048 * <P>
049 * The <CODE>BytesMessage</CODE> methods are based largely on those found in
050 * <CODE>java.io.DataInputStream</CODE> and
051 * <CODE>java.io.DataOutputStream</CODE>.
052 * <P>
053 * This message type is for client encoding of existing message formats. If
054 * possible, one of the other self-defining message types should be used
055 * instead.
056 * <P>
057 * Although the JMS API allows the use of message properties with byte messages,
058 * they are typically not used, since the inclusion of properties may affect the
059 * format.
060 * <P>
061 * The primitive types can be written explicitly using methods for each type.
062 * They may also be written generically as objects. For instance, a call to
063 * <CODE>BytesMessage.writeInt(6)</CODE> is equivalent to
064 * <CODE> BytesMessage.writeObject(new Integer(6))</CODE>. Both forms are
065 * provided, because the explicit form is convenient for static programming, and
066 * the object form is needed when types are not known at compile time.
067 * <P>
068 * When the message is first created, and when <CODE>clearBody</CODE> is
069 * called, the body of the message is in write-only mode. After the first call
070 * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
071 * After a message has been sent, the client that sent it can retain and modify
072 * it without affecting the message that has been sent. The same message object
073 * can be sent multiple times. When a message has been received, the provider
074 * has called <CODE>reset</CODE> so that the message body is in read-only mode
075 * for the client.
076 * <P>
077 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
078 * message body is cleared and the message is in write-only mode.
079 * <P>
080 * If a client attempts to read a message in write-only mode, a
081 * <CODE>MessageNotReadableException</CODE> is thrown.
082 * <P>
083 * If a client attempts to write a message in read-only mode, a
084 * <CODE>MessageNotWriteableException</CODE> is thrown.
085 *
086 * @openwire:marshaller code=24
087 * @see javax.jms.Session#createBytesMessage()
088 * @see javax.jms.MapMessage
089 * @see javax.jms.Message
090 * @see javax.jms.ObjectMessage
091 * @see javax.jms.StreamMessage
092 * @see javax.jms.TextMessage
093 */
094public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage {
095
096    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE;
097
098    protected transient DataOutputStream dataOut;
099    protected transient ByteArrayOutputStream bytesOut;
100    protected transient DataInputStream dataIn;
101    protected transient int length;
102
103    @Override
104    public Message copy() {
105        ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
106        copy(copy);
107        return copy;
108    }
109
110    private void copy(ActiveMQBytesMessage copy) {
111        storeContent();
112        super.copy(copy);
113        copy.dataOut = null;
114        copy.bytesOut = null;
115        copy.dataIn = null;
116    }
117
118    @Override
119    public void onSend() throws JMSException {
120        super.onSend();
121        storeContent();
122    }
123
124    @Override
125    public void storeContent() {
126        if (dataOut != null) {
127            try {
128                dataOut.close();
129                ByteSequence bs = bytesOut.toByteSequence();
130                setContent(bs);
131
132                ActiveMQConnection connection = getConnection();
133                if (connection != null && connection.isUseCompression()) {
134                    doCompress();
135                }
136            } catch (IOException ioe) {
137                throw new RuntimeException(ioe.getMessage(), ioe);
138            } finally {
139                try {
140                    if (bytesOut != null) {
141                        bytesOut.close();
142                        bytesOut = null;
143                    }
144                    if (dataOut != null) {
145                        dataOut.close();
146                        dataOut = null;
147                    }
148                } catch (IOException ioe) {
149                }
150            }
151        }
152    }
153
154    @Override
155    public boolean isContentMarshalled() {
156        return content != null || dataOut == null;
157    }
158
159    @Override
160    public byte getDataStructureType() {
161        return DATA_STRUCTURE_TYPE;
162    }
163
164    @Override
165    public String getJMSXMimeType() {
166        return "jms/bytes-message";
167    }
168
169    /**
170     * Clears out the message body. Clearing a message's body does not clear its
171     * header values or property entries.
172     * <P>
173     * If this message body was read-only, calling this method leaves the
174     * message body in the same state as an empty body in a newly created
175     * message.
176     *
177     * @throws JMSException if the JMS provider fails to clear the message body
178     *                 due to some internal error.
179     */
180    @Override
181    public void clearBody() throws JMSException {
182        super.clearBody();
183        this.dataOut = null;
184        this.dataIn = null;
185        this.bytesOut = null;
186    }
187
188    /**
189     * Gets the number of bytes of the message body when the message is in
190     * read-only mode. The value returned can be used to allocate a byte array.
191     * The value returned is the entire length of the message body, regardless
192     * of where the pointer for reading the message is currently located.
193     *
194     * @return number of bytes in the message
195     * @throws JMSException if the JMS provider fails to read the message due to
196     *                 some internal error.
197     * @throws MessageNotReadableException if the message is in write-only mode.
198     * @since 1.1
199     */
200
201    @Override
202    public long getBodyLength() throws JMSException {
203        initializeReading();
204        return length;
205    }
206
207    /**
208     * Reads a <code>boolean</code> from the bytes message stream.
209     *
210     * @return the <code>boolean</code> value read
211     * @throws JMSException if the JMS provider fails to read the message due to
212     *                 some internal error.
213     * @throws MessageEOFException if unexpected end of bytes stream has been
214     *                 reached.
215     * @throws MessageNotReadableException if the message is in write-only mode.
216     */
217    @Override
218    public boolean readBoolean() throws JMSException {
219        initializeReading();
220        try {
221            return this.dataIn.readBoolean();
222        } catch (EOFException e) {
223            throw JMSExceptionSupport.createMessageEOFException(e);
224        } catch (IOException e) {
225            throw JMSExceptionSupport.createMessageFormatException(e);
226        }
227    }
228
229    /**
230     * Reads a signed 8-bit value from the bytes message stream.
231     *
232     * @return the next byte from the bytes message stream as a signed 8-bit
233     *         <code>byte</code>
234     * @throws JMSException if the JMS provider fails to read the message due to
235     *                 some internal error.
236     * @throws MessageEOFException if unexpected end of bytes stream has been
237     *                 reached.
238     * @throws MessageNotReadableException if the message is in write-only mode.
239     */
240    @Override
241    public byte readByte() throws JMSException {
242        initializeReading();
243        try {
244            return this.dataIn.readByte();
245        } catch (EOFException e) {
246            throw JMSExceptionSupport.createMessageEOFException(e);
247        } catch (IOException e) {
248            throw JMSExceptionSupport.createMessageFormatException(e);
249        }
250    }
251
252    /**
253     * Reads an unsigned 8-bit number from the bytes message stream.
254     *
255     * @return the next byte from the bytes message stream, interpreted as an
256     *         unsigned 8-bit number
257     * @throws JMSException if the JMS provider fails to read the message due to
258     *                 some internal error.
259     * @throws MessageEOFException if unexpected end of bytes stream has been
260     *                 reached.
261     * @throws MessageNotReadableException if the message is in write-only mode.
262     */
263    @Override
264    public int readUnsignedByte() throws JMSException {
265        initializeReading();
266        try {
267            return this.dataIn.readUnsignedByte();
268        } catch (EOFException e) {
269            throw JMSExceptionSupport.createMessageEOFException(e);
270        } catch (IOException e) {
271            throw JMSExceptionSupport.createMessageFormatException(e);
272        }
273    }
274
275    /**
276     * Reads a signed 16-bit number from the bytes message stream.
277     *
278     * @return the next two bytes from the bytes message stream, interpreted as
279     *         a signed 16-bit number
280     * @throws JMSException if the JMS provider fails to read the message due to
281     *                 some internal error.
282     * @throws MessageEOFException if unexpected end of bytes stream has been
283     *                 reached.
284     * @throws MessageNotReadableException if the message is in write-only mode.
285     */
286    @Override
287    public short readShort() throws JMSException {
288        initializeReading();
289        try {
290            return this.dataIn.readShort();
291        } catch (EOFException e) {
292            throw JMSExceptionSupport.createMessageEOFException(e);
293        } catch (IOException e) {
294            throw JMSExceptionSupport.createMessageFormatException(e);
295        }
296    }
297
298    /**
299     * Reads an unsigned 16-bit number from the bytes message stream.
300     *
301     * @return the next two bytes from the bytes message stream, interpreted as
302     *         an unsigned 16-bit integer
303     * @throws JMSException if the JMS provider fails to read the message due to
304     *                 some internal error.
305     * @throws MessageEOFException if unexpected end of bytes stream has been
306     *                 reached.
307     * @throws MessageNotReadableException if the message is in write-only mode.
308     */
309    @Override
310    public int readUnsignedShort() throws JMSException {
311        initializeReading();
312        try {
313            return this.dataIn.readUnsignedShort();
314        } catch (EOFException e) {
315            throw JMSExceptionSupport.createMessageEOFException(e);
316        } catch (IOException e) {
317            throw JMSExceptionSupport.createMessageFormatException(e);
318        }
319    }
320
321    /**
322     * Reads a Unicode character value from the bytes message stream.
323     *
324     * @return the next two bytes from the bytes message stream as a Unicode
325     *         character
326     * @throws JMSException if the JMS provider fails to read the message due to
327     *                 some internal error.
328     * @throws MessageEOFException if unexpected end of bytes stream has been
329     *                 reached.
330     * @throws MessageNotReadableException if the message is in write-only mode.
331     */
332    @Override
333    public char readChar() throws JMSException {
334        initializeReading();
335        try {
336            return this.dataIn.readChar();
337        } catch (EOFException e) {
338            throw JMSExceptionSupport.createMessageEOFException(e);
339        } catch (IOException e) {
340            throw JMSExceptionSupport.createMessageFormatException(e);
341        }
342    }
343
344    /**
345     * Reads a signed 32-bit integer from the bytes message stream.
346     *
347     * @return the next four bytes from the bytes message stream, interpreted as
348     *         an <code>int</code>
349     * @throws JMSException if the JMS provider fails to read the message due to
350     *                 some internal error.
351     * @throws MessageEOFException if unexpected end of bytes stream has been
352     *                 reached.
353     * @throws MessageNotReadableException if the message is in write-only mode.
354     */
355    @Override
356    public int readInt() throws JMSException {
357        initializeReading();
358        try {
359            return this.dataIn.readInt();
360        } catch (EOFException e) {
361            throw JMSExceptionSupport.createMessageEOFException(e);
362        } catch (IOException e) {
363            throw JMSExceptionSupport.createMessageFormatException(e);
364        }
365    }
366
367    /**
368     * Reads a signed 64-bit integer from the bytes message stream.
369     *
370     * @return the next eight bytes from the bytes message stream, interpreted
371     *         as a <code>long</code>
372     * @throws JMSException if the JMS provider fails to read the message due to
373     *                 some internal error.
374     * @throws MessageEOFException if unexpected end of bytes stream has been
375     *                 reached.
376     * @throws MessageNotReadableException if the message is in write-only mode.
377     */
378    @Override
379    public long readLong() throws JMSException {
380        initializeReading();
381        try {
382            return this.dataIn.readLong();
383        } catch (EOFException e) {
384            throw JMSExceptionSupport.createMessageEOFException(e);
385        } catch (IOException e) {
386            throw JMSExceptionSupport.createMessageFormatException(e);
387        }
388    }
389
390    /**
391     * Reads a <code>float</code> from the bytes message stream.
392     *
393     * @return the next four bytes from the bytes message stream, interpreted as
394     *         a <code>float</code>
395     * @throws JMSException if the JMS provider fails to read the message due to
396     *                 some internal error.
397     * @throws MessageEOFException if unexpected end of bytes stream has been
398     *                 reached.
399     * @throws MessageNotReadableException if the message is in write-only mode.
400     */
401    @Override
402    public float readFloat() throws JMSException {
403        initializeReading();
404        try {
405            return this.dataIn.readFloat();
406        } catch (EOFException e) {
407            throw JMSExceptionSupport.createMessageEOFException(e);
408        } catch (IOException e) {
409            throw JMSExceptionSupport.createMessageFormatException(e);
410        }
411    }
412
413    /**
414     * Reads a <code>double</code> from the bytes message stream.
415     *
416     * @return the next eight bytes from the bytes message stream, interpreted
417     *         as a <code>double</code>
418     * @throws JMSException if the JMS provider fails to read the message due to
419     *                 some internal error.
420     * @throws MessageEOFException if unexpected end of bytes stream has been
421     *                 reached.
422     * @throws MessageNotReadableException if the message is in write-only mode.
423     */
424    @Override
425    public double readDouble() throws JMSException {
426        initializeReading();
427        try {
428            return this.dataIn.readDouble();
429        } catch (EOFException e) {
430            throw JMSExceptionSupport.createMessageEOFException(e);
431        } catch (IOException e) {
432            throw JMSExceptionSupport.createMessageFormatException(e);
433        }
434    }
435
436    /**
437     * Reads a string that has been encoded using a modified UTF-8 format from
438     * the bytes message stream.
439     * <P>
440     * For more information on the UTF-8 format, see "File System Safe UCS
441     * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
442     * X/Open Company Ltd., Document Number: P316. This information also appears
443     * in ISO/IEC 10646, Annex P.
444     *
445     * @return a Unicode string from the bytes message stream
446     * @throws JMSException if the JMS provider fails to read the message due to
447     *                 some internal error.
448     * @throws MessageEOFException if unexpected end of bytes stream has been
449     *                 reached.
450     * @throws MessageNotReadableException if the message is in write-only mode.
451     */
452    @Override
453    public String readUTF() throws JMSException {
454        initializeReading();
455        try {
456            return this.dataIn.readUTF();
457        } catch (EOFException e) {
458            throw JMSExceptionSupport.createMessageEOFException(e);
459        } catch (IOException e) {
460            throw JMSExceptionSupport.createMessageFormatException(e);
461        }
462    }
463
464    /**
465     * Reads a byte array from the bytes message stream.
466     * <P>
467     * If the length of array <code>value</code> is less than the number of
468     * bytes remaining to be read from the stream, the array should be filled. A
469     * subsequent call reads the next increment, and so on.
470     * <P>
471     * If the number of bytes remaining in the stream is less than the length of
472     * array <code>value</code>, the bytes should be read into the array. The
473     * return value of the total number of bytes read will be less than the
474     * length of the array, indicating that there are no more bytes left to be
475     * read from the stream. The next read of the stream returns -1.
476     *
477     * @param value the buffer into which the data is read
478     * @return the total number of bytes read into the buffer, or -1 if there is
479     *         no more data because the end of the stream has been reached
480     * @throws JMSException if the JMS provider fails to read the message due to
481     *                 some internal error.
482     * @throws MessageNotReadableException if the message is in write-only mode.
483     */
484    @Override
485    public int readBytes(byte[] value) throws JMSException {
486        return readBytes(value, value.length);
487    }
488
489    /**
490     * Reads a portion of the bytes message stream.
491     * <P>
492     * If the length of array <code>value</code> is less than the number of
493     * bytes remaining to be read from the stream, the array should be filled. A
494     * subsequent call reads the next increment, and so on.
495     * <P>
496     * If the number of bytes remaining in the stream is less than the length of
497     * array <code>value</code>, the bytes should be read into the array. The
498     * return value of the total number of bytes read will be less than the
499     * length of the array, indicating that there are no more bytes left to be
500     * read from the stream. The next read of the stream returns -1. <p/> If
501     * <code>length</code> is negative, or <code>length</code> is greater
502     * than the length of the array <code>value</code>, then an
503     * <code>IndexOutOfBoundsException</code> is thrown. No bytes will be read
504     * from the stream for this exception case.
505     *
506     * @param value the buffer into which the data is read
507     * @param length the number of bytes to read; must be less than or equal to
508     *                <code>value.length</code>
509     * @return the total number of bytes read into the buffer, or -1 if there is
510     *         no more data because the end of the stream has been reached
511     * @throws JMSException if the JMS provider fails to read the message due to
512     *                 some internal error.
513     * @throws MessageNotReadableException if the message is in write-only mode.
514     */
515    @Override
516    public int readBytes(byte[] value, int length) throws JMSException {
517        initializeReading();
518        try {
519            int n = 0;
520            while (n < length) {
521                int count = this.dataIn.read(value, n, length - n);
522                if (count < 0) {
523                    break;
524                }
525                n += count;
526            }
527            if (n == 0 && length > 0) {
528                n = -1;
529            }
530            return n;
531        } catch (EOFException e) {
532            throw JMSExceptionSupport.createMessageEOFException(e);
533        } catch (IOException e) {
534            throw JMSExceptionSupport.createMessageFormatException(e);
535        }
536    }
537
538    /**
539     * Writes a <code>boolean</code> to the bytes message stream as a 1-byte
540     * value. The value <code>true</code> is written as the value
541     * <code>(byte)1</code>; the value <code>false</code> is written as the
542     * value <code>(byte)0</code>.
543     *
544     * @param value the <code>boolean</code> value to be written
545     * @throws JMSException if the JMS provider fails to write the message due
546     *                 to some internal error.
547     * @throws MessageNotWriteableException if the message is in read-only mode.
548     */
549    @Override
550    public void writeBoolean(boolean value) throws JMSException {
551        initializeWriting();
552        try {
553            this.dataOut.writeBoolean(value);
554        } catch (IOException ioe) {
555            throw JMSExceptionSupport.create(ioe);
556        }
557    }
558
559    /**
560     * Writes a <code>byte</code> to the bytes message stream as a 1-byte
561     * value.
562     *
563     * @param value the <code>byte</code> value to be written
564     * @throws JMSException if the JMS provider fails to write the message due
565     *                 to some internal error.
566     * @throws MessageNotWriteableException if the message is in read-only mode.
567     */
568    @Override
569    public void writeByte(byte value) throws JMSException {
570        initializeWriting();
571        try {
572            this.dataOut.writeByte(value);
573        } catch (IOException ioe) {
574            throw JMSExceptionSupport.create(ioe);
575        }
576    }
577
578    /**
579     * Writes a <code>short</code> to the bytes message stream as two bytes,
580     * high byte first.
581     *
582     * @param value the <code>short</code> to be written
583     * @throws JMSException if the JMS provider fails to write the message due
584     *                 to some internal error.
585     * @throws MessageNotWriteableException if the message is in read-only mode.
586     */
587    @Override
588    public void writeShort(short value) throws JMSException {
589        initializeWriting();
590        try {
591            this.dataOut.writeShort(value);
592        } catch (IOException ioe) {
593            throw JMSExceptionSupport.create(ioe);
594        }
595    }
596
597    /**
598     * Writes a <code>char</code> to the bytes message stream as a 2-byte
599     * value, high byte first.
600     *
601     * @param value the <code>char</code> value to be written
602     * @throws JMSException if the JMS provider fails to write the message due
603     *                 to some internal error.
604     * @throws MessageNotWriteableException if the message is in read-only mode.
605     */
606    @Override
607    public void writeChar(char value) throws JMSException {
608        initializeWriting();
609        try {
610            this.dataOut.writeChar(value);
611        } catch (IOException ioe) {
612            throw JMSExceptionSupport.create(ioe);
613        }
614    }
615
616    /**
617     * Writes an <code>int</code> to the bytes message stream as four bytes,
618     * high byte first.
619     *
620     * @param value the <code>int</code> to be written
621     * @throws JMSException if the JMS provider fails to write the message due
622     *                 to some internal error.
623     * @throws MessageNotWriteableException if the message is in read-only mode.
624     */
625    @Override
626    public void writeInt(int value) throws JMSException {
627        initializeWriting();
628        try {
629            this.dataOut.writeInt(value);
630        } catch (IOException ioe) {
631            throw JMSExceptionSupport.create(ioe);
632        }
633    }
634
635    /**
636     * Writes a <code>long</code> to the bytes message stream as eight bytes,
637     * high byte first.
638     *
639     * @param value the <code>long</code> to be written
640     * @throws JMSException if the JMS provider fails to write the message due
641     *                 to some internal error.
642     * @throws MessageNotWriteableException if the message is in read-only mode.
643     */
644    @Override
645    public void writeLong(long value) throws JMSException {
646        initializeWriting();
647        try {
648            this.dataOut.writeLong(value);
649        } catch (IOException ioe) {
650            throw JMSExceptionSupport.create(ioe);
651        }
652    }
653
654    /**
655     * Converts the <code>float</code> argument to an <code>int</code> using
656     * the <code>floatToIntBits</code> method in class <code>Float</code>,
657     * and then writes that <code>int</code> value to the bytes message stream
658     * as a 4-byte quantity, high byte first.
659     *
660     * @param value the <code>float</code> value to be written
661     * @throws JMSException if the JMS provider fails to write the message due
662     *                 to some internal error.
663     * @throws MessageNotWriteableException if the message is in read-only mode.
664     */
665    @Override
666    public void writeFloat(float value) throws JMSException {
667        initializeWriting();
668        try {
669            this.dataOut.writeFloat(value);
670        } catch (IOException ioe) {
671            throw JMSExceptionSupport.create(ioe);
672        }
673    }
674
675    /**
676     * Converts the <code>double</code> argument to a <code>long</code>
677     * using the <code>doubleToLongBits</code> method in class
678     * <code>Double</code>, and then writes that <code>long</code> value to
679     * the bytes message stream as an 8-byte quantity, high byte first.
680     *
681     * @param value the <code>double</code> value to be written
682     * @throws JMSException if the JMS provider fails to write the message due
683     *                 to some internal error.
684     * @throws MessageNotWriteableException if the message is in read-only mode.
685     */
686    @Override
687    public void writeDouble(double value) throws JMSException {
688        initializeWriting();
689        try {
690            this.dataOut.writeDouble(value);
691        } catch (IOException ioe) {
692            throw JMSExceptionSupport.create(ioe);
693        }
694    }
695
696    /**
697     * Writes a string to the bytes message stream using UTF-8 encoding in a
698     * machine-independent manner.
699     * <P>
700     * For more information on the UTF-8 format, see "File System Safe UCS
701     * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
702     * X/Open Company Ltd., Document Number: P316. This information also appears
703     * in ISO/IEC 10646, Annex P.
704     *
705     * @param value the <code>String</code> value to be written
706     * @throws JMSException if the JMS provider fails to write the message due
707     *                 to some internal error.
708     * @throws MessageNotWriteableException if the message is in read-only mode.
709     */
710    @Override
711    public void writeUTF(String value) throws JMSException {
712        initializeWriting();
713        try {
714            this.dataOut.writeUTF(value);
715        } catch (IOException ioe) {
716            throw JMSExceptionSupport.create(ioe);
717        }
718    }
719
720    /**
721     * Writes a byte array to the bytes message stream.
722     *
723     * @param value the byte array to be written
724     * @throws JMSException if the JMS provider fails to write the message due
725     *                 to some internal error.
726     * @throws MessageNotWriteableException if the message is in read-only mode.
727     */
728    @Override
729    public void writeBytes(byte[] value) throws JMSException {
730        initializeWriting();
731        try {
732            this.dataOut.write(value);
733        } catch (IOException ioe) {
734            throw JMSExceptionSupport.create(ioe);
735        }
736    }
737
738    /**
739     * Writes a portion of a byte array to the bytes message stream.
740     *
741     * @param value the byte array value to be written
742     * @param offset the initial offset within the byte array
743     * @param length the number of bytes to use
744     * @throws JMSException if the JMS provider fails to write the message due
745     *                 to some internal error.
746     * @throws MessageNotWriteableException if the message is in read-only mode.
747     */
748    @Override
749    public void writeBytes(byte[] value, int offset, int length) throws JMSException {
750        initializeWriting();
751        try {
752            this.dataOut.write(value, offset, length);
753        } catch (IOException ioe) {
754            throw JMSExceptionSupport.create(ioe);
755        }
756    }
757
758    /**
759     * Writes an object to the bytes message stream.
760     * <P>
761     * This method works only for the objectified primitive object types (<code>Integer</code>,<code>Double</code>,
762     * <code>Long</code> &nbsp;...), <code>String</code> objects, and byte
763     * arrays.
764     *
765     * @param value the object in the Java programming language ("Java object")
766     *                to be written; it must not be null
767     * @throws JMSException if the JMS provider fails to write the message due
768     *                 to some internal error.
769     * @throws MessageFormatException if the object is of an invalid type.
770     * @throws MessageNotWriteableException if the message is in read-only mode.
771     * @throws java.lang.NullPointerException if the parameter
772     *                 <code>value</code> is null.
773     */
774    @Override
775    public void writeObject(Object value) throws JMSException {
776        if (value == null) {
777            throw new NullPointerException();
778        }
779        initializeWriting();
780        if (value instanceof Boolean) {
781            writeBoolean(((Boolean)value).booleanValue());
782        } else if (value instanceof Character) {
783            writeChar(((Character)value).charValue());
784        } else if (value instanceof Byte) {
785            writeByte(((Byte)value).byteValue());
786        } else if (value instanceof Short) {
787            writeShort(((Short)value).shortValue());
788        } else if (value instanceof Integer) {
789            writeInt(((Integer)value).intValue());
790        } else if (value instanceof Long) {
791            writeLong(((Long)value).longValue());
792        } else if (value instanceof Float) {
793            writeFloat(((Float)value).floatValue());
794        } else if (value instanceof Double) {
795            writeDouble(((Double)value).doubleValue());
796        } else if (value instanceof String) {
797            writeUTF(value.toString());
798        } else if (value instanceof byte[]) {
799            writeBytes((byte[])value);
800        } else {
801            throw new MessageFormatException("Cannot write non-primitive type:" + value.getClass());
802        }
803    }
804
805    /**
806     * Puts the message body in read-only mode and repositions the stream of
807     * bytes to the beginning.
808     *
809     * @throws JMSException if an internal error occurs
810     */
811    @Override
812    public void reset() throws JMSException {
813        storeContent();
814        setReadOnlyBody(true);
815        try {
816            if (bytesOut != null) {
817                bytesOut.close();
818                bytesOut = null;
819            }
820            if (dataIn != null) {
821                dataIn.close();
822                dataIn = null;
823            }
824            if (dataOut != null) {
825                dataOut.close();
826                dataOut = null;
827            }
828        } catch (IOException ioe) {
829            throw JMSExceptionSupport.create(ioe);
830        }
831    }
832
833    private void initializeWriting() throws JMSException {
834        checkReadOnlyBody();
835        if (this.dataOut == null) {
836            this.bytesOut = new ByteArrayOutputStream();
837            OutputStream os = bytesOut;
838            this.dataOut = new DataOutputStream(os);
839        }
840
841        restoreOldContent();
842    }
843
844    private void restoreOldContent() throws JMSException {
845        // For a message that already had a body and was sent we need to restore the content
846        // if the message is used again without having its clearBody method called.
847        if (this.content != null && this.content.length > 0) {
848            try {
849                ByteSequence toRestore = this.content;
850                if (isCompressed()) {
851                    toRestore = new ByteSequence(decompress(this.content));
852                    compressed = false;
853                }
854
855                this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength());
856                // Free up the buffer from the old content, will be re-written when
857                // the message is sent again and storeContent() is called.
858                this.content = null;
859            } catch (IOException ioe) {
860                throw JMSExceptionSupport.create(ioe);
861            }
862        }
863    }
864
865    protected void checkWriteOnlyBody() throws MessageNotReadableException {
866        if (!readOnlyBody) {
867            throw new MessageNotReadableException("Message body is write-only");
868        }
869    }
870
871    private void initializeReading() throws JMSException {
872        checkWriteOnlyBody();
873        if (dataIn == null) {
874            try {
875                ByteSequence data = getContent();
876                if (data == null) {
877                    data = new ByteSequence(new byte[] {}, 0, 0);
878                }
879                InputStream is = new ByteArrayInputStream(data);
880                if (isCompressed()) {
881                    if (data.length != 0) {
882                        is = new ByteArrayInputStream(decompress(data));
883                    }
884                } else {
885                    length = data.getLength();
886                }
887
888                dataIn = new DataInputStream(is);
889            } catch (IOException ioe) {
890                throw JMSExceptionSupport.create(ioe);
891            }
892        }
893    }
894
895    protected byte[] decompress(ByteSequence dataSequence) throws IOException {
896        Inflater inflater = new Inflater();
897        ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
898        try {
899            length = ByteSequenceData.readIntBig(dataSequence);
900            dataSequence.offset = 0;
901            byte[] data = Arrays.copyOfRange(dataSequence.getData(), 4, dataSequence.getLength());
902            inflater.setInput(data);
903            byte[] buffer = new byte[length];
904            int count = inflater.inflate(buffer);
905            decompressed.write(buffer, 0, count);
906            return decompressed.toByteArray();
907        } catch (Exception e) {
908            throw new IOException(e);
909        } finally {
910            inflater.end();
911            decompressed.close();
912        }
913    }
914
915    @Override
916    public void setObjectProperty(String name, Object value) throws JMSException {
917        initializeWriting();
918        super.setObjectProperty(name, value);
919    }
920
921    @Override
922    public String toString() {
923        return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
924    }
925
926    @Override
927    protected void doCompress() throws IOException {
928        compressed = true;
929        ByteSequence bytes = getContent();
930        if (bytes != null) {
931            int length = bytes.getLength();
932            ByteArrayOutputStream compressed = new ByteArrayOutputStream();
933            compressed.write(new byte[4]);
934            Deflater deflater = new Deflater();
935            try {
936                deflater.setInput(bytes.data);
937                deflater.finish();
938                byte[] buffer = new byte[1024];
939                while (!deflater.finished()) {
940                    int count = deflater.deflate(buffer);
941                    compressed.write(buffer, 0, count);
942                }
943
944                bytes = compressed.toByteSequence();
945                ByteSequenceData.writeIntBig(bytes, length);
946                bytes.offset = 0;
947                setContent(bytes);
948            } finally {
949                deflater.end();
950                compressed.close();
951            }
952        }
953    }
954}