001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.util.Map;
020
021import javax.management.openmbean.CompositeData;
022import javax.management.openmbean.OpenDataException;
023import javax.jms.JMSException;
024
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.Queue;
027import org.apache.activemq.broker.region.QueueMessageReference;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.util.BrokerSupport;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Provides a JMX Management view of a Queue.
036 */
037public class QueueView extends DestinationView implements QueueViewMBean {
038    private static final Logger LOG = LoggerFactory.getLogger(QueueView.class);
039
040    public QueueView(ManagedRegionBroker broker, Queue destination) {
041        super(broker, destination);
042    }
043
044    public CompositeData getMessage(String messageId) throws OpenDataException {
045        CompositeData result = null;
046        QueueMessageReference ref = ((Queue)destination).getMessage(messageId);
047
048        if (ref != null) {
049                Message rc = ref.getMessage();
050                if (rc == null) {
051                    return null;
052                }
053                result = OpenTypeSupport.convert(rc);
054        }
055
056        return result;
057    }
058
059    public synchronized void purge() throws Exception {
060        final long originalMessageCount = destination.getDestinationStatistics().getMessages().getCount();
061
062        ((Queue)destination).purge();
063
064        LOG.info("{} purge of {} messages", destination.getActiveMQDestination().getQualifiedName(), originalMessageCount);
065    }
066
067    public boolean removeMessage(String messageId) throws Exception {
068        return ((Queue)destination).removeMessage(messageId);
069    }
070
071    public synchronized int removeMatchingMessages(String selector) throws Exception {
072        return ((Queue)destination).removeMatchingMessages(selector);
073    }
074
075    public synchronized int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
076        return ((Queue)destination).removeMatchingMessages(selector, maximumMessages);
077    }
078
079    public boolean copyMessageTo(String messageId, String destinationName) throws Exception {
080        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
081        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
082        return ((Queue)destination).copyMessageTo(context, messageId, toDestination);
083    }
084
085    public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception {
086        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
087        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
088        return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination);
089    }
090
091    public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
092        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
093        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
094        return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages);
095    }
096
097    public boolean moveMessageTo(String messageId, String destinationName) throws Exception {
098        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
099        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
100        return ((Queue)destination).moveMessageTo(context, messageId, toDestination);
101    }
102
103    public synchronized int moveMatchingMessagesTo(String selector, String destinationName) throws Exception {
104        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
105        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
106        return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination);
107    }
108
109    public synchronized int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
110        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
111        ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
112        return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages);
113    }
114
115    public synchronized int retryMessages() throws Exception {
116        ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
117        return ((Queue)destination).retryMessages(context, Integer.MAX_VALUE);
118    }
119
120    /**
121     * Moves a message back to its original destination
122     */
123    public boolean retryMessage(String messageId) throws Exception {
124        Queue queue = (Queue) destination;
125        QueueMessageReference ref = queue.getMessage(messageId);
126        Message rc = ref.getMessage();
127        if (rc != null) {
128            ActiveMQDestination originalDestination = rc.getOriginalDestination();
129            if (originalDestination != null) {
130                ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
131                return queue.moveMessageTo(context, ref, originalDestination);
132            }
133            else {
134                throw new JMSException("No original destination for message: "+ messageId);
135            }
136        }
137        else {
138            throw new JMSException("Could not find message: "+ messageId);
139        }
140    }
141
142    public int cursorSize() {
143        Queue queue = (Queue) destination;
144        if (queue.getMessages() != null){
145            return queue.getMessages().size();
146        }
147        return 0;
148    }
149
150
151    public boolean doesCursorHaveMessagesBuffered() {
152       Queue queue = (Queue) destination;
153       if (queue.getMessages() != null){
154           return queue.getMessages().hasMessagesBufferedToDeliver();
155       }
156       return false;
157
158    }
159
160
161    public boolean doesCursorHaveSpace() {
162        Queue queue = (Queue) destination;
163        if (queue.getMessages() != null){
164            return queue.getMessages().hasSpace();
165        }
166        return false;
167    }
168
169
170    public long getCursorMemoryUsage() {
171        Queue queue = (Queue) destination;
172        if (queue.getMessages() != null &&  queue.getMessages().getSystemUsage() != null){
173            return queue.getMessages().getSystemUsage().getMemoryUsage().getUsage();
174        }
175        return 0;
176    }
177
178    public int getCursorPercentUsage() {
179        Queue queue = (Queue) destination;
180        if (queue.getMessages() != null &&  queue.getMessages().getSystemUsage() != null){
181            return queue.getMessages().getSystemUsage().getMemoryUsage().getPercentUsage();
182        }
183        return 0;
184    }
185
186    public boolean isCursorFull() {
187        Queue queue = (Queue) destination;
188        if (queue.getMessages() != null){
189            return queue.getMessages().isFull();
190        }
191        return false;
192    }
193
194    public boolean isCacheEnabled() {
195        Queue queue = (Queue) destination;
196        if (queue.getMessages() != null){
197            return queue.getMessages().isCacheEnabled();
198        }
199        return false;
200    }
201
202    /**
203     * @return a Map of groupNames and ConsumerIds
204     */
205    @Override
206    public Map<String, String> getMessageGroups() {
207        Queue queue = (Queue) destination;
208        return queue.getMessageGroupOwners().getGroups();
209    }
210
211    /**
212     * @return the message group type implementation (simple,bucket,cached)
213     */
214    @Override
215    public String getMessageGroupType() {
216        Queue queue = (Queue) destination;
217        return queue.getMessageGroupOwners().getType();
218    }
219
220    /**
221     * remove a message group = has the effect of rebalancing group
222     */
223    @Override
224    public void removeMessageGroup(@MBeanInfo("groupName") String groupName) {
225        Queue queue = (Queue) destination;
226        queue.getMessageGroupOwners().removeGroup(groupName);
227    }
228
229    /**
230     * remove all the message groups - will rebalance all message groups across consumers
231     */
232    @Override
233    public void removeAllMessageGroups() {
234        Queue queue = (Queue) destination;
235        queue.getMessageGroupOwners().removeAll();
236    }
237
238    @Override
239    public void pause() {
240        Queue queue = (Queue) destination;
241        queue.pauseDispatch();
242    }
243
244    @Override
245    public void resume() {
246        Queue queue = (Queue) destination;
247        queue.resumeDispatch();
248    }
249
250    @Override
251    public boolean isPaused() {
252        Queue queue = (Queue) destination;
253        return queue.isDispatchPaused();
254    }
255}