001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.util.Set;
021
022import javax.jms.InvalidSelectorException;
023import javax.management.ObjectName;
024
025import org.apache.activemq.broker.BrokerService;
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.broker.region.Subscription;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.ActiveMQQueue;
030import org.apache.activemq.command.ActiveMQTopic;
031import org.apache.activemq.command.ConsumerInfo;
032import org.apache.activemq.filter.DestinationFilter;
033import org.apache.activemq.util.IOExceptionSupport;
034
035/**
036 *
037 */
038public class SubscriptionView implements SubscriptionViewMBean {
039
040    protected final Subscription subscription;
041    protected final String clientId;
042    protected final String userName;
043
044    /**
045     * Constructor
046     *
047     * @param subs
048     */
049    public SubscriptionView(String clientId, String userName, Subscription subs) {
050        this.clientId = clientId;
051        this.subscription = subs;
052        this.userName = userName;
053    }
054
055    /**
056     * @return the clientId
057     */
058    @Override
059    public String getClientId() {
060        return clientId;
061    }
062
063    /**
064     * @returns the ObjectName of the Connection that created this subscription
065     */
066    @Override
067    public ObjectName getConnection() {
068        ObjectName result = null;
069
070        if (clientId != null && subscription != null) {
071            ConnectionContext ctx = subscription.getContext();
072            if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) {
073                BrokerService service = ctx.getBroker().getBrokerService();
074                ManagementContext managementCtx = service.getManagementContext();
075                if (managementCtx != null) {
076
077                    try {
078                        ObjectName query = createConnectionQuery(managementCtx, service.getBrokerName());
079                        Set<ObjectName> names = managementCtx.queryNames(query, null);
080                        if (names.size() == 1) {
081                            result = names.iterator().next();
082                        }
083                    } catch (Exception e) {
084                    }
085                }
086            }
087        }
088        return result;
089    }
090
091
092
093    private ObjectName createConnectionQuery(ManagementContext ctx, String brokerName) throws IOException {
094        try {
095            return BrokerMBeanSupport.createConnectionQuery(ctx.getJmxDomainName(), brokerName, clientId);
096        } catch (Throwable e) {
097            throw IOExceptionSupport.create(e);
098        }
099    }
100
101    /**
102     * @return the id of the Connection the Subscription is on
103     */
104    @Override
105    public String getConnectionId() {
106        ConsumerInfo info = getConsumerInfo();
107        if (info != null) {
108            return info.getConsumerId().getConnectionId();
109        }
110        return "NOTSET";
111    }
112
113    /**
114     * @return the id of the Session the subscription is on
115     */
116    @Override
117    public long getSessionId() {
118        ConsumerInfo info = getConsumerInfo();
119        if (info != null) {
120            return info.getConsumerId().getSessionId();
121        }
122        return 0;
123    }
124
125    /**
126     * @return the id of the Subscription
127     */
128    @Override
129    public long getSubscriptionId() {
130        ConsumerInfo info = getConsumerInfo();
131        if (info != null) {
132            return info.getConsumerId().getValue();
133        }
134        return 0;
135    }
136
137    /**
138     * @return the destination name
139     */
140    @Override
141    public String getDestinationName() {
142        ConsumerInfo info = getConsumerInfo();
143        if (info != null) {
144            ActiveMQDestination dest = info.getDestination();
145            return dest.getPhysicalName();
146        }
147        return "NOTSET";
148    }
149
150    @Override
151    public String getSelector() {
152        if (subscription != null) {
153            return subscription.getSelector();
154        }
155        return null;
156    }
157
158    @Override
159    public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
160        if (subscription != null) {
161            subscription.setSelector(selector);
162        } else {
163            throw new UnsupportedOperationException("No subscription object");
164        }
165    }
166
167    /**
168     * @return true if the destination is a Queue
169     */
170    @Override
171    public boolean isDestinationQueue() {
172        ConsumerInfo info = getConsumerInfo();
173        if (info != null) {
174            ActiveMQDestination dest = info.getDestination();
175            return dest.isQueue();
176        }
177        return false;
178    }
179
180    /**
181     * @return true of the destination is a Topic
182     */
183    @Override
184    public boolean isDestinationTopic() {
185        ConsumerInfo info = getConsumerInfo();
186        if (info != null) {
187            ActiveMQDestination dest = info.getDestination();
188            return dest.isTopic();
189        }
190        return false;
191    }
192
193    /**
194     * @return true if the destination is temporary
195     */
196    @Override
197    public boolean isDestinationTemporary() {
198        ConsumerInfo info = getConsumerInfo();
199        if (info != null) {
200            ActiveMQDestination dest = info.getDestination();
201            return dest.isTemporary();
202        }
203        return false;
204    }
205
206    /**
207     * @return true if the subscriber is active
208     */
209    @Override
210    public boolean isActive() {
211        return true;
212    }
213
214    @Override
215    public boolean isNetwork() {
216        ConsumerInfo info = getConsumerInfo();
217        if (info != null) {
218            return info.isNetworkSubscription();
219        }
220        return false;
221    }
222
223    /**
224     * The subscription should release as may references as it can to help the
225     * garbage collector reclaim memory.
226     */
227    public void gc() {
228        if (subscription != null) {
229            subscription.gc();
230        }
231    }
232
233    /**
234     * @return whether or not the subscriber is retroactive or not
235     */
236    @Override
237    public boolean isRetroactive() {
238        ConsumerInfo info = getConsumerInfo();
239        return info != null ? info.isRetroactive() : false;
240    }
241
242    /**
243     * @return whether or not the subscriber is an exclusive consumer
244     */
245    @Override
246    public boolean isExclusive() {
247        ConsumerInfo info = getConsumerInfo();
248        return info != null ? info.isExclusive() : false;
249    }
250
251    /**
252     * @return whether or not the subscriber is durable (persistent)
253     */
254    @Override
255    public boolean isDurable() {
256        ConsumerInfo info = getConsumerInfo();
257        return info != null ? info.isDurable() : false;
258    }
259
260    /**
261     * @return whether or not the subscriber ignores local messages
262     */
263    @Override
264    public boolean isNoLocal() {
265        ConsumerInfo info = getConsumerInfo();
266        return info != null ? info.isNoLocal() : false;
267    }
268
269    /**
270     * @return whether or not the subscriber is configured for async dispatch
271     */
272    @Override
273    public boolean isDispatchAsync() {
274        ConsumerInfo info = getConsumerInfo();
275        return info != null ? info.isDispatchAsync() : false;
276    }
277
278    /**
279     * @return the maximum number of pending messages allowed in addition to the
280     *         prefetch size. If enabled to a non-zero value then this will
281     *         perform eviction of messages for slow consumers on non-durable
282     *         topics.
283     */
284    @Override
285    public int getMaximumPendingMessageLimit() {
286        ConsumerInfo info = getConsumerInfo();
287        return info != null ? info.getMaximumPendingMessageLimit() : 0;
288    }
289
290    /**
291     * @return the consumer priority
292     */
293    @Override
294    public byte getPriority() {
295        ConsumerInfo info = getConsumerInfo();
296        return info != null ? info.getPriority() : 0;
297    }
298
299    /**
300     * @return the name of the consumer which is only used for durable
301     *         consumers.
302     */
303    @Override
304    public String getSubscriptionName() {
305        ConsumerInfo info = getConsumerInfo();
306        return info != null ? info.getSubscriptionName() : null;
307    }
308
309    /**
310     * @return number of messages pending delivery
311     */
312    @Override
313    public int getPendingQueueSize() {
314        return subscription != null ? subscription.getPendingQueueSize() : 0;
315    }
316
317    /**
318     * @return number of messages dispatched
319     */
320    @Override
321    public int getDispatchedQueueSize() {
322        return subscription != null ? subscription.getDispatchedQueueSize() : 0;
323    }
324
325    @Override
326    public int getMessageCountAwaitingAcknowledge() {
327        return getDispatchedQueueSize();
328    }
329
330    /**
331     * @return number of messages that matched the subscription
332     */
333    @Override
334    public long getDispatchedCounter() {
335        return subscription != null ? subscription.getDispatchedCounter() : 0;
336    }
337
338    /**
339     * @return number of messages that matched the subscription
340     */
341    @Override
342    public long getEnqueueCounter() {
343        return subscription != null ? subscription.getEnqueueCounter() : 0;
344    }
345
346    /**
347     * @return number of messages queued by the client
348     */
349    @Override
350    public long getDequeueCounter() {
351        return subscription != null ? subscription.getDequeueCounter() : 0;
352    }
353
354    protected ConsumerInfo getConsumerInfo() {
355        return subscription != null ? subscription.getConsumerInfo() : null;
356    }
357
358    /**
359     * @return pretty print
360     */
361    @Override
362    public String toString() {
363        return "SubscriptionView: " + getClientId() + ":" + getConnectionId();
364    }
365
366    /**
367     */
368    @Override
369    public int getPrefetchSize() {
370        return subscription != null ? subscription.getPrefetchSize() : 0;
371    }
372
373    @Override
374    public boolean isMatchingQueue(String queueName) {
375        if (isDestinationQueue()) {
376            return matchesDestination(new ActiveMQQueue(queueName));
377        }
378        return false;
379    }
380
381    @Override
382    public boolean isMatchingTopic(String topicName) {
383        if (isDestinationTopic()) {
384            return matchesDestination(new ActiveMQTopic(topicName));
385        }
386        return false;
387    }
388
389    /**
390     * Return true if this subscription matches the given destination
391     *
392     * @param destination the destination to compare against
393     * @return true if this subscription matches the given destination
394     */
395    public boolean matchesDestination(ActiveMQDestination destination) {
396        ActiveMQDestination subscriptionDestination = subscription.getActiveMQDestination();
397        DestinationFilter filter = DestinationFilter.parseFilter(subscriptionDestination);
398        return filter.matches(destination);
399    }
400
401    @Override
402    public boolean isSlowConsumer() {
403        return subscription.isSlowConsumer();
404    }
405
406    @Override
407    public String getUserName() {
408        return userName;
409    }
410
411    @Override
412    public void resetStatistics() {
413        if (subscription != null && subscription.getSubscriptionStatistics() != null){
414            subscription.getSubscriptionStatistics().reset();
415        }
416    }
417
418    @Override
419    public long getConsumedCount() {
420        return subscription != null ? subscription.getConsumedCount() : 0;
421    }
422}