001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.net.URISyntaxException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028
029import javax.jms.Connection;
030import javax.jms.InvalidSelectorException;
031import javax.jms.MessageProducer;
032import javax.jms.Session;
033import javax.management.MalformedObjectNameException;
034import javax.management.ObjectName;
035import javax.management.openmbean.CompositeData;
036import javax.management.openmbean.CompositeDataSupport;
037import javax.management.openmbean.CompositeType;
038import javax.management.openmbean.OpenDataException;
039import javax.management.openmbean.TabularData;
040import javax.management.openmbean.TabularDataSupport;
041import javax.management.openmbean.TabularType;
042
043import org.apache.activemq.ActiveMQConnectionFactory;
044import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
045import org.apache.activemq.broker.region.Destination;
046import org.apache.activemq.broker.region.Subscription;
047import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
048import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
049import org.apache.activemq.command.ActiveMQDestination;
050import org.apache.activemq.command.ActiveMQMessage;
051import org.apache.activemq.command.ActiveMQTextMessage;
052import org.apache.activemq.command.Message;
053import org.apache.activemq.filter.BooleanExpression;
054import org.apache.activemq.filter.MessageEvaluationContext;
055import org.apache.activemq.selector.SelectorParser;
056import org.apache.activemq.store.MessageStore;
057import org.apache.activemq.util.URISupport;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061public class DestinationView implements DestinationViewMBean {
062    private static final Logger LOG = LoggerFactory.getLogger(DestinationViewMBean.class);
063    protected final Destination destination;
064    protected final ManagedRegionBroker broker;
065
066    public DestinationView(ManagedRegionBroker broker, Destination destination) {
067        this.broker = broker;
068        this.destination = destination;
069    }
070
071    public void gc() {
072        destination.gc();
073    }
074
075    @Override
076    public String getName() {
077        return destination.getName();
078    }
079
080    @Override
081    public void resetStatistics() {
082        destination.getDestinationStatistics().reset();
083    }
084
085    @Override
086    public long getEnqueueCount() {
087        return destination.getDestinationStatistics().getEnqueues().getCount();
088    }
089
090    @Override
091    public long getDequeueCount() {
092        return destination.getDestinationStatistics().getDequeues().getCount();
093    }
094
095    @Override
096    public long getForwardCount() {
097        return destination.getDestinationStatistics().getForwards().getCount();
098    }
099
100    @Override
101    public long getDispatchCount() {
102        return destination.getDestinationStatistics().getDispatched().getCount();
103    }
104
105    @Override
106    public long getInFlightCount() {
107        return destination.getDestinationStatistics().getInflight().getCount();
108    }
109
110    @Override
111    public long getExpiredCount() {
112        return destination.getDestinationStatistics().getExpired().getCount();
113    }
114
115    @Override
116    public long getConsumerCount() {
117        return destination.getDestinationStatistics().getConsumers().getCount();
118    }
119
120    @Override
121    public long getQueueSize() {
122        return destination.getDestinationStatistics().getMessages().getCount();
123    }
124
125    @Override
126    public long getStoreMessageSize() {
127        MessageStore messageStore = destination.getMessageStore();
128        return messageStore != null ? messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize() : 0;
129    }
130
131    public long getMessagesCached() {
132        return destination.getDestinationStatistics().getMessagesCached().getCount();
133    }
134
135    @Override
136    public int getMemoryPercentUsage() {
137        return destination.getMemoryUsage().getPercentUsage();
138    }
139
140    @Override
141    public long getMemoryUsageByteCount() {
142        return destination.getMemoryUsage().getUsage();
143    }
144
145    @Override
146    public long getMemoryLimit() {
147        return destination.getMemoryUsage().getLimit();
148    }
149
150    @Override
151    public void setMemoryLimit(long limit) {
152        destination.getMemoryUsage().setLimit(limit);
153    }
154
155    @Override
156    public double getAverageEnqueueTime() {
157        return destination.getDestinationStatistics().getProcessTime().getAverageTime();
158    }
159
160    @Override
161    public long getMaxEnqueueTime() {
162        return destination.getDestinationStatistics().getProcessTime().getMaxTime();
163    }
164
165    @Override
166    public long getMinEnqueueTime() {
167        return destination.getDestinationStatistics().getProcessTime().getMinTime();
168    }
169
170    /**
171     * @return the average size of a message (bytes)
172     */
173    @Override
174    public long getAverageMessageSize() {
175        // we are okay with the size without decimals so cast to long
176        return (long) destination.getDestinationStatistics().getMessageSize().getAverageSize();
177    }
178
179    /**
180     * @return the max size of a message (bytes)
181     */
182    @Override
183    public long getMaxMessageSize() {
184        return destination.getDestinationStatistics().getMessageSize().getMaxSize();
185    }
186
187    /**
188     * @return the min size of a message (bytes)
189     */
190    @Override
191    public long getMinMessageSize() {
192        return destination.getDestinationStatistics().getMessageSize().getMinSize();
193    }
194
195
196    @Override
197    public boolean isPrioritizedMessages() {
198        return destination.isPrioritizedMessages();
199    }
200
201    @Override
202    public CompositeData[] browse() throws OpenDataException {
203        try {
204            return browse(null);
205        } catch (InvalidSelectorException e) {
206            // should not happen.
207            throw new RuntimeException(e);
208        }
209    }
210
211    @Override
212    public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException {
213        Message[] messages = destination.browse();
214        ArrayList<CompositeData> c = new ArrayList<CompositeData>();
215
216        MessageEvaluationContext ctx = new MessageEvaluationContext();
217        ctx.setDestination(destination.getActiveMQDestination());
218        BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
219
220        for (int i = 0; i < messages.length; i++) {
221            try {
222
223                if (selectorExpression == null) {
224                    c.add(OpenTypeSupport.convert(messages[i]));
225                } else {
226                    ctx.setMessageReference(messages[i]);
227                    if (selectorExpression.matches(ctx)) {
228                        c.add(OpenTypeSupport.convert(messages[i]));
229                    }
230                }
231
232            } catch (Throwable e) {
233                LOG.warn("exception browsing destination", e);
234            }
235        }
236
237        CompositeData rc[] = new CompositeData[c.size()];
238        c.toArray(rc);
239        return rc;
240    }
241
242    /**
243     * Browses the current destination returning a list of messages
244     */
245    @Override
246    public List<Object> browseMessages() throws InvalidSelectorException {
247        return browseMessages(null);
248    }
249
250    /**
251     * Browses the current destination with the given selector returning a list
252     * of messages
253     */
254    @Override
255    public List<Object> browseMessages(String selector) throws InvalidSelectorException {
256        Message[] messages = destination.browse();
257        ArrayList<Object> answer = new ArrayList<Object>();
258
259        MessageEvaluationContext ctx = new MessageEvaluationContext();
260        ctx.setDestination(destination.getActiveMQDestination());
261        BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
262
263        for (int i = 0; i < messages.length; i++) {
264            try {
265                Message message = messages[i];
266                message.setReadOnlyBody(true);
267                if (selectorExpression == null) {
268                    answer.add(message);
269                } else {
270                    ctx.setMessageReference(message);
271                    if (selectorExpression.matches(ctx)) {
272                        answer.add(message);
273                    }
274                }
275
276            } catch (Throwable e) {
277                LOG.warn("exception browsing destination", e);
278            }
279        }
280        return answer;
281    }
282
283    @Override
284    public TabularData browseAsTable() throws OpenDataException {
285        try {
286            return browseAsTable(null);
287        } catch (InvalidSelectorException e) {
288            throw new RuntimeException(e);
289        }
290    }
291
292    @Override
293    public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException {
294        OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
295        Message[] messages = destination.browse();
296        CompositeType ct = factory.getCompositeType();
297        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
298        TabularDataSupport rc = new TabularDataSupport(tt);
299
300        MessageEvaluationContext ctx = new MessageEvaluationContext();
301        ctx.setDestination(destination.getActiveMQDestination());
302        BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
303
304        for (int i = 0; i < messages.length; i++) {
305            try {
306                if (selectorExpression == null) {
307                    rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
308                } else {
309                    ctx.setMessageReference(messages[i]);
310                    if (selectorExpression.matches(ctx)) {
311                        rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
312                    }
313                }
314            } catch (Throwable e) {
315                LOG.warn("exception browsing destination", e);
316            }
317        }
318
319        return rc;
320    }
321
322    @Override
323    public String sendTextMessageWithProperties(String properties) throws Exception {
324        String[] kvs = properties.split(",");
325        Map<String, String> props = new HashMap<String, String>();
326        for (String kv : kvs) {
327            String[] it = kv.split("=");
328            if (it.length == 2) {
329                props.put(it[0],it[1]);
330            }
331        }
332        return sendTextMessage(props, props.remove("body"), props.remove("username"), props.remove("password"));
333    }
334
335    @Override
336    public String sendTextMessage(String body) throws Exception {
337        return sendTextMessage(Collections.EMPTY_MAP, body);
338    }
339
340    @Override
341    public String sendTextMessage(Map headers, String body) throws Exception {
342        return sendTextMessage(headers, body, null, null);
343    }
344
345    @Override
346    public String sendTextMessage(String body, String user, @Sensitive String password) throws Exception {
347        return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
348    }
349
350    @Override
351    public String sendTextMessage(Map<String, String> headers, String body, String userName, @Sensitive String password) throws Exception {
352
353        String brokerUrl = "vm://" + broker.getBrokerName();
354        ActiveMQDestination dest = destination.getActiveMQDestination();
355
356        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
357        Connection connection = null;
358        try {
359            connection = cf.createConnection(userName, password);
360            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
361            MessageProducer producer = session.createProducer(dest);
362            ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
363
364            for (Iterator<Entry<String, String>> iter = headers.entrySet().iterator(); iter.hasNext();) {
365                Entry<String, String> entry = iter.next();
366                msg.setObjectProperty(entry.getKey(), entry.getValue());
367            }
368
369            producer.setDeliveryMode(msg.getJMSDeliveryMode());
370            producer.setPriority(msg.getPriority());
371            long ttl = 0;
372            if (msg.getExpiration() != 0) {
373                ttl = msg.getExpiration() - System.currentTimeMillis();
374            } else {
375                String timeToLive = headers.get("timeToLive");
376                if (timeToLive != null) {
377                    ttl = Integer.valueOf(timeToLive);
378                }
379            }
380            producer.setTimeToLive(ttl > 0 ? ttl : 0);
381            producer.send(msg);
382
383            return msg.getJMSMessageID();
384
385        } finally {
386            if (connection != null) {
387                connection.close();
388            }
389        }
390    }
391
392    @Override
393    public int getMaxAuditDepth() {
394        return destination.getMaxAuditDepth();
395    }
396
397    @Override
398    public int getMaxProducersToAudit() {
399        return destination.getMaxProducersToAudit();
400    }
401
402    public boolean isEnableAudit() {
403        return destination.isEnableAudit();
404    }
405
406    public void setEnableAudit(boolean enableAudit) {
407        destination.setEnableAudit(enableAudit);
408    }
409
410    @Override
411    public void setMaxAuditDepth(int maxAuditDepth) {
412        destination.setMaxAuditDepth(maxAuditDepth);
413    }
414
415    @Override
416    public void setMaxProducersToAudit(int maxProducersToAudit) {
417        destination.setMaxProducersToAudit(maxProducersToAudit);
418    }
419
420    @Override
421    public float getMemoryUsagePortion() {
422        return destination.getMemoryUsage().getUsagePortion();
423    }
424
425    @Override
426    public long getProducerCount() {
427        return destination.getDestinationStatistics().getProducers().getCount();
428    }
429
430    @Override
431    public boolean isProducerFlowControl() {
432        return destination.isProducerFlowControl();
433    }
434
435    @Override
436    public void setMemoryUsagePortion(float value) {
437        destination.getMemoryUsage().setUsagePortion(value);
438    }
439
440    @Override
441    public void setProducerFlowControl(boolean producerFlowControl) {
442        destination.setProducerFlowControl(producerFlowControl);
443    }
444
445    @Override
446    public boolean isAlwaysRetroactive() {
447        return destination.isAlwaysRetroactive();
448    }
449
450    @Override
451    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
452        destination.setAlwaysRetroactive(alwaysRetroactive);
453    }
454
455    /**
456     * Set's the interval at which warnings about producers being blocked by
457     * resource usage will be triggered. Values of 0 or less will disable
458     * warnings
459     *
460     * @param blockedProducerWarningInterval the interval at which warning about
461     *            blocked producers will be triggered.
462     */
463    @Override
464    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
465        destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
466    }
467
468    /**
469     *
470     * @return the interval at which warning about blocked producers will be
471     *         triggered.
472     */
473    @Override
474    public long getBlockedProducerWarningInterval() {
475        return destination.getBlockedProducerWarningInterval();
476    }
477
478    @Override
479    public int getMaxPageSize() {
480        return destination.getMaxPageSize();
481    }
482
483    @Override
484    public void setMaxPageSize(int pageSize) {
485        destination.setMaxPageSize(pageSize);
486    }
487
488    @Override
489    public boolean isUseCache() {
490        return destination.isUseCache();
491    }
492
493    @Override
494    public void setUseCache(boolean value) {
495        destination.setUseCache(value);
496    }
497
498    @Override
499    public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {
500        List<Subscription> subscriptions = destination.getConsumers();
501        ObjectName[] answer = new ObjectName[subscriptions.size()];
502        ObjectName brokerObjectName = broker.getBrokerService().getBrokerObjectName();
503        int index = 0;
504        for (Subscription subscription : subscriptions) {
505            String connectionClientId = subscription.getContext().getClientId();
506            answer[index++] = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, subscription.getConsumerInfo());
507        }
508        return answer;
509    }
510
511    @Override
512    public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException {
513        ObjectName result = null;
514        SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy();
515        if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) {
516            result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy);
517        }
518        return result;
519    }
520
521    @Override
522    public String getOptions() {
523        Map<String, String> options = destination.getActiveMQDestination().getOptions();
524        String optionsString = "";
525        try {
526            if (options != null) {
527                optionsString = URISupport.createQueryString(options);
528            }
529        } catch (URISyntaxException ignored) {}
530        return optionsString;
531    }
532
533    @Override
534    public boolean isDLQ() {
535        return destination.getActiveMQDestination().isDLQ();
536    }
537
538    @Override
539    public void setDLQ(boolean val) {
540         destination.getActiveMQDestination().setDLQ(val);
541    }
542
543    @Override
544    public long getBlockedSends() {
545        return destination.getDestinationStatistics().getBlockedSends().getCount();
546    }
547
548    @Override
549    public double getAverageBlockedTime() {
550        return destination.getDestinationStatistics().getBlockedTime().getAverageTime();
551    }
552
553    @Override
554    public long getTotalBlockedTime() {
555        return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
556    }
557
558}