001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.store;
019
020import java.util.concurrent.ConcurrentHashMap;
021import java.util.concurrent.ConcurrentMap;
022
023import org.apache.activemq.management.CountStatisticImpl;
024import org.apache.activemq.management.SizeStatisticImpl;
025
026public class MessageStoreSubscriptionStatistics extends AbstractMessageStoreStatistics {
027
028    private final ConcurrentMap<String, SubscriptionStatistics> subStatistics
029        = new ConcurrentHashMap<>();
030
031    /**
032     * @param enabled
033     * @param countDescription
034     * @param sizeDescription
035     */
036    public MessageStoreSubscriptionStatistics(boolean enabled) {
037        super(enabled, "The number of messages or this subscription in the message store",
038                "Size of messages contained by this subscription in the message store");
039    }
040
041    /**
042     * Total count for all subscriptions
043     */
044    @Override
045    public CountStatisticImpl getMessageCount() {
046        return this.messageCount;
047    }
048
049    /**
050     * Total size for all subscriptions
051     */
052    @Override
053    public SizeStatisticImpl getMessageSize() {
054        return this.messageSize;
055    }
056
057    public CountStatisticImpl getMessageCount(String subKey) {
058        return getOrInitStatistics(subKey).getMessageCount();
059    }
060
061    public SizeStatisticImpl getMessageSize(String subKey) {
062        return getOrInitStatistics(subKey).getMessageSize();
063    }
064
065    public void removeSubscription(String subKey) {
066        SubscriptionStatistics subStats = subStatistics.remove(subKey);
067        //Subtract from the parent
068        if (subStats != null) {
069           getMessageCount().subtract(subStats.getMessageCount().getCount());
070           getMessageSize().addSize(-subStats.getMessageSize().getTotalSize());
071        }
072    }
073
074    @Override
075    public void reset() {
076        super.reset();
077        subStatistics.clear();
078    }
079
080    private SubscriptionStatistics getOrInitStatistics(String subKey) {
081        SubscriptionStatistics subStats = subStatistics.get(subKey);
082
083        if (subStats == null) {
084            final SubscriptionStatistics stats = new SubscriptionStatistics();
085            subStats = subStatistics.putIfAbsent(subKey, stats);
086            if (subStats == null) {
087                subStats = stats;
088            }
089        }
090
091        return subStats;
092    }
093
094    private class SubscriptionStatistics extends AbstractMessageStoreStatistics {
095
096        public SubscriptionStatistics() {
097            this(MessageStoreSubscriptionStatistics.this.enabled);
098        }
099
100        /**
101         * @param enabled
102         * @param countDescription
103         * @param sizeDescription
104         */
105        public SubscriptionStatistics(boolean enabled) {
106            super(enabled, "The number of messages or this subscription in the message store",
107                    "Size of messages contained by this subscription in the message store");
108            this.setParent(MessageStoreSubscriptionStatistics.this);
109        }
110    }
111
112}