001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.LinkedHashMap;
022import java.util.LinkedList;
023import java.util.Map;
024import java.util.Map.Entry;
025
026import javax.jms.JMSException;
027
028import org.apache.activemq.command.ActiveMQBytesMessage;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ActiveMQMessage;
031import org.apache.activemq.command.ConsumerInfo;
032import org.apache.activemq.command.MessageAck;
033import org.apache.activemq.command.MessageDispatch;
034import org.apache.activemq.command.MessageId;
035import org.apache.activemq.command.TransactionId;
036
037/**
038 * Keeps track of the STOMP subscription so that acking is correctly done.
039 *
040 * @author <a href="http://hiramchirino.com">chirino</a>
041 */
042public class StompSubscription {
043
044    public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
045    public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
046    public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
047
048    protected final ProtocolConverter protocolConverter;
049    protected final String subscriptionId;
050    protected final ConsumerInfo consumerInfo;
051
052    protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<>();
053    protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<>();
054
055    protected String ackMode = AUTO_ACK;
056    protected ActiveMQDestination destination;
057    protected String transformation;
058
059    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
060        this.protocolConverter = stompTransport;
061        this.subscriptionId = subscriptionId;
062        this.consumerInfo = consumerInfo;
063        this.transformation = transformation;
064    }
065
066    void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
067        ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
068        if (ackMode.equals(CLIENT_ACK) || ackMode.equals(INDIVIDUAL_ACK)) {
069            synchronized (this) {
070                dispatchedMessage.put(message.getMessageId(), md);
071            }
072        } else if (ackMode.equals(AUTO_ACK)) {
073            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
074            protocolConverter.getStompTransport().sendToActiveMQ(ack);
075        }
076
077        boolean ignoreTransformation = false;
078
079        if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
080            message.setReadOnlyProperties(false);
081            message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
082        } else {
083            if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
084                ignoreTransformation = true;
085            }
086        }
087
088        StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
089
090        command.setAction(Stomp.Responses.MESSAGE);
091        if (subscriptionId != null) {
092            command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
093        }
094
095        if (ackId != null) {
096            command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
097        }
098
099        protocolConverter.getStompTransport().sendToStomp(command);
100    }
101
102    synchronized void onStompAbort(TransactionId transactionId) {
103        unconsumedMessage.clear();
104    }
105
106    void onStompCommit(TransactionId transactionId) {
107        MessageAck ack = null;
108        synchronized (this) {
109            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
110                @SuppressWarnings("rawtypes")
111                Map.Entry entry = (Entry)iter.next();
112                MessageDispatch msg = (MessageDispatch)entry.getValue();
113                if (unconsumedMessage.contains(msg)) {
114                    iter.remove();
115                }
116            }
117
118            // For individual Ack we already sent an Ack that will be applied on commit
119            // we don't send a second standard Ack as that would produce an error.
120            if (!unconsumedMessage.isEmpty() && ackMode == CLIENT_ACK) {
121                ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
122                ack.setTransactionId(transactionId);
123                unconsumedMessage.clear();
124            }
125        }
126        // avoid contention with onMessageDispatch
127        if (ack != null) {
128            protocolConverter.getStompTransport().sendToActiveMQ(ack);
129        }
130    }
131
132    synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
133
134        MessageId msgId = new MessageId(messageId);
135
136        if (!dispatchedMessage.containsKey(msgId)) {
137            return null;
138        }
139
140        MessageAck ack = new MessageAck();
141        ack.setDestination(consumerInfo.getDestination());
142        ack.setConsumerId(consumerInfo.getConsumerId());
143
144        if (ackMode == CLIENT_ACK) {
145            if (transactionId == null) {
146                ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
147            } else {
148                ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
149            }
150            int count = 0;
151            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
152
153                @SuppressWarnings("rawtypes")
154                Map.Entry entry = (Entry)iter.next();
155                MessageId id = (MessageId)entry.getKey();
156                MessageDispatch msg = (MessageDispatch)entry.getValue();
157
158                if (transactionId != null) {
159                    if (!unconsumedMessage.contains(msg)) {
160                        unconsumedMessage.add(msg);
161                        count++;
162                    }
163                } else {
164                    iter.remove();
165                    count++;
166                }
167
168                if (id.equals(msgId)) {
169                    ack.setLastMessageId(id);
170                    break;
171                }
172            }
173            ack.setMessageCount(count);
174            if (transactionId != null) {
175                ack.setTransactionId(transactionId);
176            }
177
178        } else if (ackMode == INDIVIDUAL_ACK) {
179            ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
180            ack.setMessageID(msgId);
181            ack.setMessageCount(1);
182            if (transactionId != null) {
183                unconsumedMessage.add(dispatchedMessage.get(msgId));
184                ack.setTransactionId(transactionId);
185            } else {
186                dispatchedMessage.remove(msgId);
187            }
188        }
189        return ack;
190    }
191
192    public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
193
194        MessageId msgId = new MessageId(messageId);
195
196        if (!dispatchedMessage.containsKey(msgId)) {
197            return null;
198        }
199
200        MessageAck ack = new MessageAck();
201        ack.setDestination(consumerInfo.getDestination());
202        ack.setConsumerId(consumerInfo.getConsumerId());
203        ack.setAckType(MessageAck.POSION_ACK_TYPE);
204        ack.setMessageID(msgId);
205        if (transactionId != null) {
206            unconsumedMessage.add(dispatchedMessage.get(msgId));
207            ack.setTransactionId(transactionId);
208        }
209        dispatchedMessage.remove(msgId);
210
211        return ack;
212    }
213
214    public String getAckMode() {
215        return ackMode;
216    }
217
218    public void setAckMode(String ackMode) {
219        this.ackMode = ackMode;
220    }
221
222    public String getSubscriptionId() {
223        return subscriptionId;
224    }
225
226    public void setDestination(ActiveMQDestination destination) {
227        this.destination = destination;
228    }
229
230    public ActiveMQDestination getDestination() {
231        return destination;
232    }
233
234    public ConsumerInfo getConsumerInfo() {
235        return consumerInfo;
236    }
237}