001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.util.Map; 020 021import javax.management.openmbean.CompositeData; 022import javax.management.openmbean.OpenDataException; 023import javax.jms.JMSException; 024 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.broker.region.Queue; 027import org.apache.activemq.broker.region.QueueMessageReference; 028import org.apache.activemq.command.ActiveMQDestination; 029import org.apache.activemq.command.Message; 030import org.apache.activemq.util.BrokerSupport; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Provides a JMX Management view of a Queue. 036 */ 037public class QueueView extends DestinationView implements QueueViewMBean { 038 private static final Logger LOG = LoggerFactory.getLogger(QueueView.class); 039 040 public QueueView(ManagedRegionBroker broker, Queue destination) { 041 super(broker, destination); 042 } 043 044 public CompositeData getMessage(String messageId) throws OpenDataException { 045 CompositeData result = null; 046 QueueMessageReference ref = ((Queue)destination).getMessage(messageId); 047 048 if (ref != null) { 049 Message rc = ref.getMessage(); 050 if (rc == null) { 051 return null; 052 } 053 result = OpenTypeSupport.convert(rc); 054 } 055 056 return result; 057 } 058 059 public synchronized void purge() throws Exception { 060 final long originalMessageCount = destination.getDestinationStatistics().getMessages().getCount(); 061 062 ((Queue)destination).purge(); 063 064 LOG.info("{} purge of {} messages", destination.getActiveMQDestination().getQualifiedName(), originalMessageCount); 065 } 066 067 public boolean removeMessage(String messageId) throws Exception { 068 return ((Queue)destination).removeMessage(messageId); 069 } 070 071 public synchronized int removeMatchingMessages(String selector) throws Exception { 072 return ((Queue)destination).removeMatchingMessages(selector); 073 } 074 075 public synchronized int removeMatchingMessages(String selector, int maximumMessages) throws Exception { 076 return ((Queue)destination).removeMatchingMessages(selector, maximumMessages); 077 } 078 079 public boolean copyMessageTo(String messageId, String destinationName) throws Exception { 080 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 081 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 082 return ((Queue)destination).copyMessageTo(context, messageId, toDestination); 083 } 084 085 public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception { 086 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 087 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 088 return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination); 089 } 090 091 public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception { 092 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 093 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 094 return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages); 095 } 096 097 public boolean moveMessageTo(String messageId, String destinationName) throws Exception { 098 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 099 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 100 return ((Queue)destination).moveMessageTo(context, messageId, toDestination); 101 } 102 103 public synchronized int moveMatchingMessagesTo(String selector, String destinationName) throws Exception { 104 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 105 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 106 return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination); 107 } 108 109 public synchronized int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception { 110 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 111 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE); 112 return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages); 113 } 114 115 public synchronized int retryMessages() throws Exception { 116 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 117 return ((Queue)destination).retryMessages(context, Integer.MAX_VALUE); 118 } 119 120 /** 121 * Moves a message back to its original destination 122 */ 123 public boolean retryMessage(String messageId) throws Exception { 124 Queue queue = (Queue) destination; 125 QueueMessageReference ref = queue.getMessage(messageId); 126 Message rc = ref.getMessage(); 127 if (rc != null) { 128 ActiveMQDestination originalDestination = rc.getOriginalDestination(); 129 if (originalDestination != null) { 130 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker()); 131 return queue.moveMessageTo(context, ref, originalDestination); 132 } 133 else { 134 throw new JMSException("No original destination for message: "+ messageId); 135 } 136 } 137 else { 138 throw new JMSException("Could not find message: "+ messageId); 139 } 140 } 141 142 public int cursorSize() { 143 Queue queue = (Queue) destination; 144 if (queue.getMessages() != null){ 145 return queue.getMessages().size(); 146 } 147 return 0; 148 } 149 150 151 public boolean doesCursorHaveMessagesBuffered() { 152 Queue queue = (Queue) destination; 153 if (queue.getMessages() != null){ 154 return queue.getMessages().hasMessagesBufferedToDeliver(); 155 } 156 return false; 157 158 } 159 160 161 public boolean doesCursorHaveSpace() { 162 Queue queue = (Queue) destination; 163 if (queue.getMessages() != null){ 164 return queue.getMessages().hasSpace(); 165 } 166 return false; 167 } 168 169 170 public long getCursorMemoryUsage() { 171 Queue queue = (Queue) destination; 172 if (queue.getMessages() != null && queue.getMessages().getSystemUsage() != null){ 173 return queue.getMessages().getSystemUsage().getMemoryUsage().getUsage(); 174 } 175 return 0; 176 } 177 178 public int getCursorPercentUsage() { 179 Queue queue = (Queue) destination; 180 if (queue.getMessages() != null && queue.getMessages().getSystemUsage() != null){ 181 return queue.getMessages().getSystemUsage().getMemoryUsage().getPercentUsage(); 182 } 183 return 0; 184 } 185 186 public boolean isCursorFull() { 187 Queue queue = (Queue) destination; 188 if (queue.getMessages() != null){ 189 return queue.getMessages().isFull(); 190 } 191 return false; 192 } 193 194 public boolean isCacheEnabled() { 195 Queue queue = (Queue) destination; 196 if (queue.getMessages() != null){ 197 return queue.getMessages().isCacheEnabled(); 198 } 199 return false; 200 } 201 202 /** 203 * @return a Map of groupNames and ConsumerIds 204 */ 205 @Override 206 public Map<String, String> getMessageGroups() { 207 Queue queue = (Queue) destination; 208 return queue.getMessageGroupOwners().getGroups(); 209 } 210 211 /** 212 * @return the message group type implementation (simple,bucket,cached) 213 */ 214 @Override 215 public String getMessageGroupType() { 216 Queue queue = (Queue) destination; 217 return queue.getMessageGroupOwners().getType(); 218 } 219 220 /** 221 * remove a message group = has the effect of rebalancing group 222 */ 223 @Override 224 public void removeMessageGroup(@MBeanInfo("groupName") String groupName) { 225 Queue queue = (Queue) destination; 226 queue.getMessageGroupOwners().removeGroup(groupName); 227 } 228 229 /** 230 * remove all the message groups - will rebalance all message groups across consumers 231 */ 232 @Override 233 public void removeAllMessageGroups() { 234 Queue queue = (Queue) destination; 235 queue.getMessageGroupOwners().removeAll(); 236 } 237 238 @Override 239 public void pause() { 240 Queue queue = (Queue) destination; 241 queue.pauseDispatch(); 242 } 243 244 @Override 245 public void resume() { 246 Queue queue = (Queue) destination; 247 queue.resumeDispatch(); 248 } 249 250 @Override 251 public boolean isPaused() { 252 Queue queue = (Queue) destination; 253 return queue.isDispatchPaused(); 254 } 255}