001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.List;
022import java.util.concurrent.CopyOnWriteArrayList;
023import java.util.concurrent.atomic.AtomicInteger;
024
025import javax.jms.InvalidSelectorException;
026import javax.jms.JMSException;
027import javax.management.ObjectName;
028
029import org.apache.activemq.broker.Broker;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.ConsumerId;
033import org.apache.activemq.command.ConsumerInfo;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.filter.BooleanExpression;
036import org.apache.activemq.filter.DestinationFilter;
037import org.apache.activemq.filter.LogicExpression;
038import org.apache.activemq.filter.MessageEvaluationContext;
039import org.apache.activemq.filter.NoLocalExpression;
040import org.apache.activemq.selector.SelectorParser;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044public abstract class AbstractSubscription implements Subscription {
045
046    private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscription.class);
047
048    protected Broker broker;
049    protected ConnectionContext context;
050    protected ConsumerInfo info;
051    protected final DestinationFilter destinationFilter;
052    protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
053    protected final AtomicInteger prefetchExtension = new AtomicInteger(0);
054
055    private boolean usePrefetchExtension = true;
056    private BooleanExpression selectorExpression;
057    private ObjectName objectName;
058    private int cursorMemoryHighWaterMark = 70;
059    private boolean slowConsumer;
060    private long lastAckTime;
061    private final SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
062
063    public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
064        this.broker = broker;
065        this.context = context;
066        this.info = info;
067        this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
068        this.selectorExpression = parseSelector(info);
069        this.lastAckTime = System.currentTimeMillis();
070    }
071
072    private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
073        BooleanExpression rc = null;
074        if (info.getSelector() != null) {
075            rc = SelectorParser.parse(info.getSelector());
076        }
077        if (info.isNoLocal()) {
078            if (rc == null) {
079                rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
080            } else {
081                rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
082            }
083        }
084        if (info.getAdditionalPredicate() != null) {
085            if (rc == null) {
086                rc = info.getAdditionalPredicate();
087            } else {
088                rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
089            }
090        }
091        return rc;
092    }
093
094    @Override
095    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
096        this.lastAckTime = System.currentTimeMillis();
097        subscriptionStatistics.getConsumedCount().increment();
098    }
099
100    @Override
101    public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
102        ConsumerId targetConsumerId = node.getTargetConsumerId();
103        if (targetConsumerId != null) {
104            if (!targetConsumerId.equals(info.getConsumerId())) {
105                return false;
106            }
107        }
108        try {
109            return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
110        } catch (JMSException e) {
111            LOG.info("Selector failed to evaluate: {}", e.getMessage(), e);
112            return false;
113        }
114    }
115
116    @Override
117    public boolean isWildcard() {
118        return destinationFilter.isWildcard();
119    }
120
121    @Override
122    public boolean matches(ActiveMQDestination destination) {
123        return destinationFilter.matches(destination);
124    }
125
126    @Override
127    public void add(ConnectionContext context, Destination destination) throws Exception {
128        destinations.add(destination);
129    }
130
131    @Override
132    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
133        destinations.remove(destination);
134        return Collections.EMPTY_LIST;
135    }
136
137    @Override
138    public ConsumerInfo getConsumerInfo() {
139        return info;
140    }
141
142    @Override
143    public void gc() {
144    }
145
146    @Override
147    public ConnectionContext getContext() {
148        return context;
149    }
150
151    public ConsumerInfo getInfo() {
152        return info;
153    }
154
155    public BooleanExpression getSelectorExpression() {
156        return selectorExpression;
157    }
158
159    @Override
160    public String getSelector() {
161        return info.getSelector();
162    }
163
164    @Override
165    public void setSelector(String selector) throws InvalidSelectorException {
166        ConsumerInfo copy = info.copy();
167        copy.setSelector(selector);
168        BooleanExpression newSelector = parseSelector(copy);
169        // its valid so lets actually update it now
170        info.setSelector(selector);
171        this.selectorExpression = newSelector;
172    }
173
174    @Override
175    public ObjectName getObjectName() {
176        return objectName;
177    }
178
179    @Override
180    public void setObjectName(ObjectName objectName) {
181        this.objectName = objectName;
182    }
183
184    @Override
185    public int getPrefetchSize() {
186        return info.getPrefetchSize();
187    }
188
189    public boolean isUsePrefetchExtension() {
190        return usePrefetchExtension;
191    }
192
193    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
194        this.usePrefetchExtension = usePrefetchExtension;
195    }
196
197    public void setPrefetchSize(int newSize) {
198        info.setPrefetchSize(newSize);
199    }
200
201    @Override
202    public boolean isRecoveryRequired() {
203        return true;
204    }
205
206    @Override
207    public boolean isSlowConsumer() {
208        return slowConsumer;
209    }
210
211    public void setSlowConsumer(boolean val) {
212        slowConsumer = val;
213    }
214
215    @Override
216    public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
217        boolean result = false;
218        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
219        try {
220            Destination regionDestination = (Destination) message.getRegionDestination();
221            msgContext.setDestination(regionDestination.getActiveMQDestination());
222            msgContext.setMessageReference(message);
223            result = matches(message, msgContext);
224            if (result) {
225                doAddRecoveredMessage(message);
226            }
227        } finally {
228            msgContext.clear();
229        }
230        return result;
231    }
232
233    @Override
234    public ActiveMQDestination getActiveMQDestination() {
235        return info != null ? info.getDestination() : null;
236    }
237
238    @Override
239    public boolean isBrowser() {
240        return info != null && info.isBrowser();
241    }
242
243    @Override
244    public long getInFlightMessageSize() {
245        return subscriptionStatistics.getInflightMessageSize().getTotalSize();
246    }
247
248    @Override
249    public int getInFlightUsage() {
250        int prefetchSize = info.getPrefetchSize();
251        if (prefetchSize > 0) {
252            return (getInFlightSize() * 100) / prefetchSize;
253        }
254        return Integer.MAX_VALUE;
255    }
256
257    /**
258     * Add a destination
259     * @param destination
260     */
261    public void addDestination(Destination destination) {
262    }
263
264    /**
265     * Remove a destination
266     * @param destination
267     */
268    public void removeDestination(Destination destination) {
269    }
270
271    @Override
272    public int getCursorMemoryHighWaterMark(){
273        return this.cursorMemoryHighWaterMark;
274    }
275
276    @Override
277    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
278        this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
279    }
280
281    @Override
282    public int countBeforeFull() {
283        return info.getPrefetchSize() - getDispatchedQueueSize();
284    }
285
286    @Override
287    public void unmatched(MessageReference node) throws IOException {
288        // only durable topic subs have something to do here
289    }
290
291    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
292        add(message);
293    }
294
295    @Override
296    public long getTimeOfLastMessageAck() {
297        return lastAckTime;
298    }
299
300    public void setTimeOfLastMessageAck(long value) {
301        this.lastAckTime = value;
302    }
303
304    @Override
305    public long getConsumedCount(){
306        return subscriptionStatistics.getConsumedCount().getCount();
307    }
308
309    @Override
310    public void incrementConsumedCount(){
311        subscriptionStatistics.getConsumedCount().increment();
312    }
313
314    @Override
315    public void resetConsumedCount(){
316        subscriptionStatistics.getConsumedCount().reset();
317    }
318
319    @Override
320    public SubscriptionStatistics getSubscriptionStatistics() {
321        return subscriptionStatistics;
322    }
323
324    public void wakeupDestinationsForDispatch() {
325        for (Destination dest : destinations) {
326            dest.wakeup();
327        }
328    }
329
330    public AtomicInteger getPrefetchExtension() {
331        return this.prefetchExtension;
332    }
333
334    protected void contractPrefetchExtension(int amount) {
335        if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
336            decrementPrefetchExtension(amount);
337        }
338    }
339
340    protected void expandPrefetchExtension(int amount) {
341        if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
342            incrementPrefetchExtension(amount);
343        }
344    }
345
346    protected void decrementPrefetchExtension(int amount) {
347        while (true) {
348            int currentExtension = prefetchExtension.get();
349            int newExtension = Math.max(0, currentExtension - amount);
350            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
351                break;
352            }
353        }
354    }
355
356    private void incrementPrefetchExtension(int amount) {
357        while (true) {
358            int currentExtension = prefetchExtension.get();
359            int newExtension = Math.max(currentExtension, currentExtension + amount);
360            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
361                break;
362            }
363        }
364    }
365
366}