001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.Collections;
020import java.util.LinkedList;
021import java.util.List;
022import java.util.Set;
023import org.apache.activemq.ActiveMQMessageAudit;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.BaseDestination;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.MessageReference;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.usage.SystemUsage;
032
033/**
034 * Abstract method holder for pending message (messages awaiting disptach to a
035 * consumer) cursor
036 *
037 *
038 */
039public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
040    protected int memoryUsageHighWaterMark = 70;
041    protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
042    protected SystemUsage systemUsage;
043    protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
044    protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
045    protected boolean enableAudit=true;
046    protected ActiveMQMessageAudit audit;
047    protected boolean useCache=true;
048    private boolean cacheEnabled=true;
049    private boolean started=false;
050    protected MessageReference last = null;
051    protected final boolean prioritizedMessages;
052
053    public AbstractPendingMessageCursor(boolean prioritizedMessages) {
054        this.prioritizedMessages=prioritizedMessages;
055    }
056
057
058    @Override
059    public synchronized void start() throws Exception  {
060        if (!started && enableAudit && audit==null) {
061            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
062        }
063        started=true;
064    }
065
066    @Override
067    public synchronized void stop() throws Exception  {
068        started=false;
069        gc();
070    }
071
072    @Override
073    public void add(ConnectionContext context, Destination destination) throws Exception {
074    }
075
076    @Override
077    @SuppressWarnings("unchecked")
078    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
079        return Collections.EMPTY_LIST;
080    }
081
082    @Override
083    public boolean isRecoveryRequired() {
084        return true;
085    }
086
087    @Override
088    public void addMessageFirst(MessageReference node) throws Exception {
089    }
090
091    @Override
092    public boolean addMessageLast(MessageReference node) throws Exception {
093        return tryAddMessageLast(node, INFINITE_WAIT);
094    }
095
096    @Override
097    public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
098        return true;
099    }
100
101    @Override
102    public void addRecoveredMessage(MessageReference node) throws Exception {
103        addMessageLast(node);
104    }
105
106    @Override
107    public void clear() {
108    }
109
110    @Override
111    public boolean hasNext() {
112        return false;
113    }
114
115    @Override
116    public boolean isEmpty() {
117        return false;
118    }
119
120    @Override
121    public boolean isEmpty(Destination destination) {
122        return isEmpty();
123    }
124
125    @Override
126    public MessageReference next() {
127        return null;
128    }
129
130    @Override
131    public void remove() {
132    }
133
134    @Override
135    public void reset() {
136    }
137
138    @Override
139    public int size() {
140        return 0;
141    }
142
143    @Override
144    public int getMaxBatchSize() {
145        return maxBatchSize;
146    }
147
148    @Override
149    public void setMaxBatchSize(int maxBatchSize) {
150        this.maxBatchSize = maxBatchSize;
151    }
152
153    protected void fillBatch() throws Exception {
154    }
155
156    @Override
157    public void resetForGC() {
158        reset();
159    }
160
161    @Override
162    public void remove(MessageReference node) {
163    }
164
165    @Override
166    public void gc() {
167    }
168
169    @Override
170    public void setSystemUsage(SystemUsage usageManager) {
171        this.systemUsage = usageManager;
172    }
173
174    @Override
175    public boolean hasSpace() {
176        // allow isFull to verify parent usage and otherwise enforce local memoryUsageHighWaterMark
177        return systemUsage != null ? (!isParentFull() && systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
178    }
179
180    private boolean isParentFull() {
181        boolean result = false;
182        if (systemUsage != null) {
183            if (systemUsage.getMemoryUsage().getParent() != null) {
184                return systemUsage.getMemoryUsage().getParent().getPercentUsage() >= 100;
185            }
186        }
187        return result;
188    }
189
190    @Override
191    public boolean isFull() {
192        return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
193    }
194
195    @Override
196    public void release() {
197    }
198
199    @Override
200    public boolean hasMessagesBufferedToDeliver() {
201        return false;
202    }
203
204    /**
205     * @return the memoryUsageHighWaterMark
206     */
207    @Override
208    public int getMemoryUsageHighWaterMark() {
209        return memoryUsageHighWaterMark;
210    }
211
212    /**
213     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
214     */
215    @Override
216    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
217        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
218    }
219
220    /**
221     * @return the usageManager
222     */
223    @Override
224    public SystemUsage getSystemUsage() {
225        return this.systemUsage;
226    }
227
228    /**
229     * destroy the cursor
230     *
231     * @throws Exception
232     */
233    @Override
234    public void destroy() throws Exception {
235        stop();
236    }
237
238    /**
239     * Page in a restricted number of messages
240     *
241     * @param maxItems maximum number of messages to return
242     * @return a list of paged in messages
243     */
244    @Override
245    public LinkedList<MessageReference> pageInList(int maxItems) {
246        throw new RuntimeException("Not supported");
247    }
248
249    /**
250     * @return the maxProducersToAudit
251     */
252    @Override
253    public int getMaxProducersToAudit() {
254        return maxProducersToAudit;
255    }
256
257    /**
258     * @param maxProducersToAudit the maxProducersToAudit to set
259     */
260    @Override
261    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
262        this.maxProducersToAudit = maxProducersToAudit;
263        if (audit != null) {
264            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
265        }
266    }
267
268    /**
269     * @return the maxAuditDepth
270     */
271    @Override
272    public int getMaxAuditDepth() {
273        return maxAuditDepth;
274    }
275
276
277    /**
278     * @param maxAuditDepth the maxAuditDepth to set
279     */
280    @Override
281    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
282        this.maxAuditDepth = maxAuditDepth;
283        if (audit != null) {
284            audit.setAuditDepth(maxAuditDepth);
285        }
286    }
287
288
289    /**
290     * @return the enableAudit
291     */
292    @Override
293    public boolean isEnableAudit() {
294        return enableAudit;
295    }
296
297    /**
298     * @param enableAudit the enableAudit to set
299     */
300    @Override
301    public synchronized void setEnableAudit(boolean enableAudit) {
302        this.enableAudit = enableAudit;
303        if (enableAudit && started && audit==null) {
304            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
305        }
306    }
307
308    @Override
309    public boolean isTransient() {
310        return false;
311    }
312
313
314    /**
315     * set the audit
316     * @param audit new audit component
317     */
318    @Override
319    public void setMessageAudit(ActiveMQMessageAudit audit) {
320        this.audit=audit;
321    }
322
323
324    /**
325     * @return the audit
326     */
327    @Override
328    public ActiveMQMessageAudit getMessageAudit() {
329        return audit;
330    }
331
332    @Override
333    public boolean isUseCache() {
334        return useCache;
335    }
336
337    @Override
338    public void setUseCache(boolean useCache) {
339        this.useCache = useCache;
340    }
341
342    public synchronized boolean isDuplicate(MessageId messageId) {
343        boolean unique = recordUniqueId(messageId);
344        rollback(messageId);
345        return !unique;
346    }
347
348    /**
349     * records a message id and checks if it is a duplicate
350     * @param messageId
351     * @return true if id is unique, false otherwise.
352     */
353    public synchronized boolean recordUniqueId(MessageId messageId) {
354        if (!enableAudit || audit==null) {
355            return true;
356        }
357        return !audit.isDuplicate(messageId);
358    }
359
360    @Override
361    public synchronized void rollback(MessageId id) {
362        if (audit != null) {
363            audit.rollback(id);
364        }
365    }
366
367    public synchronized boolean isStarted() {
368        return started;
369    }
370
371    public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
372        boolean result = false;
373        Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
374        if (destinations != null) {
375            for (Destination dest:destinations) {
376                if (dest.isPrioritizedMessages()) {
377                    result = true;
378                    break;
379                }
380            }
381        }
382        return result;
383
384    }
385
386    @Override
387    public synchronized boolean isCacheEnabled() {
388        return cacheEnabled;
389    }
390
391    public synchronized void setCacheEnabled(boolean val) {
392        cacheEnabled = val;
393    }
394
395    @Override
396    public void rebase() {
397    }
398}