001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.CancellationException;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.Future;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import org.apache.activemq.advisory.AdvisorySupport;
032import org.apache.activemq.broker.BrokerService;
033import org.apache.activemq.broker.ConnectionContext;
034import org.apache.activemq.broker.ProducerBrokerExchange;
035import org.apache.activemq.broker.region.policy.DispatchPolicy;
036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
040import org.apache.activemq.broker.util.InsertionCountList;
041import org.apache.activemq.command.ActiveMQDestination;
042import org.apache.activemq.command.ConsumerInfo;
043import org.apache.activemq.command.ExceptionResponse;
044import org.apache.activemq.command.Message;
045import org.apache.activemq.command.MessageAck;
046import org.apache.activemq.command.MessageId;
047import org.apache.activemq.command.ProducerAck;
048import org.apache.activemq.command.ProducerInfo;
049import org.apache.activemq.command.Response;
050import org.apache.activemq.command.SubscriptionInfo;
051import org.apache.activemq.filter.MessageEvaluationContext;
052import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
053import org.apache.activemq.store.MessageRecoveryListener;
054import org.apache.activemq.store.NoLocalSubscriptionAware;
055import org.apache.activemq.store.PersistenceAdapter;
056import org.apache.activemq.store.TopicMessageStore;
057import org.apache.activemq.thread.Task;
058import org.apache.activemq.thread.TaskRunner;
059import org.apache.activemq.thread.TaskRunnerFactory;
060import org.apache.activemq.transaction.Synchronization;
061import org.apache.activemq.util.SubscriptionKey;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * The Topic is a destination that sends a copy of a message to every active
067 * Subscription registered.
068 */
069public class Topic extends BaseDestination implements Task {
070    protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
071    private final TopicMessageStore topicStore;
072    protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
073    private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
074    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
075    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
076    private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
077    private final TaskRunner taskRunner;
078    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
079    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
080        @Override
081        public void run() {
082            try {
083                Topic.this.taskRunner.wakeup();
084            } catch (InterruptedException e) {
085            }
086        }
087    };
088
089    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
090            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
091        super(brokerService, store, destination, parentStats);
092        this.topicStore = store;
093        subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
094        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
095    }
096
097    @Override
098    public void initialize() throws Exception {
099        super.initialize();
100        // set non default subscription recovery policy (override policyEntries)
101        if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
102            subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
103            setAlwaysRetroactive(true);
104        }
105        if (store != null) {
106            // AMQ-2586: Better to leave this stat at zero than to give the user
107            // misleading metrics.
108            // int messageCount = store.getMessageCount();
109            // destinationStatistics.getMessages().setCount(messageCount);
110            store.start();
111        }
112    }
113
114    @Override
115    public List<Subscription> getConsumers() {
116        synchronized (consumers) {
117            return new ArrayList<Subscription>(consumers);
118        }
119    }
120
121    public boolean lock(MessageReference node, LockOwner sub) {
122        return true;
123    }
124
125    @Override
126    public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
127        if (!sub.getConsumerInfo().isDurable()) {
128
129            // Do a retroactive recovery if needed.
130            if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
131
132                // synchronize with dispatch method so that no new messages are sent
133                // while we are recovering a subscription to avoid out of order messages.
134                dispatchLock.writeLock().lock();
135                try {
136                    boolean applyRecovery = false;
137                    synchronized (consumers) {
138                        if (!consumers.contains(sub)){
139                            sub.add(context, this);
140                            consumers.add(sub);
141                            applyRecovery=true;
142                            super.addSubscription(context, sub);
143                        }
144                    }
145                    if (applyRecovery){
146                        subscriptionRecoveryPolicy.recover(context, this, sub);
147                    }
148                } finally {
149                    dispatchLock.writeLock().unlock();
150                }
151
152            } else {
153                synchronized (consumers) {
154                    if (!consumers.contains(sub)){
155                        sub.add(context, this);
156                        consumers.add(sub);
157                        super.addSubscription(context, sub);
158                    }
159                }
160            }
161        } else {
162            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
163            super.addSubscription(context, sub);
164            sub.add(context, this);
165            if(dsub.isActive()) {
166                synchronized (consumers) {
167                    boolean hasSubscription = false;
168
169                    if (consumers.size() == 0) {
170                        hasSubscription = false;
171                    } else {
172                        for (Subscription currentSub : consumers) {
173                            if (currentSub.getConsumerInfo().isDurable()) {
174                                DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
175                                if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
176                                    hasSubscription = true;
177                                    break;
178                                }
179                            }
180                        }
181                    }
182
183                    if (!hasSubscription) {
184                        consumers.add(sub);
185                    }
186                }
187            }
188            durableSubscribers.put(dsub.getSubscriptionKey(), dsub);
189        }
190    }
191
192    @Override
193    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
194        if (!sub.getConsumerInfo().isDurable()) {
195            boolean removed = false;
196            synchronized (consumers) {
197                removed = consumers.remove(sub);
198            }
199            if (removed) {
200                super.removeSubscription(context, sub, lastDeliveredSequenceId);
201            }
202        }
203        sub.remove(context, this);
204    }
205
206    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
207        if (topicStore != null) {
208            topicStore.deleteSubscription(key.clientId, key.subscriptionName);
209            DurableTopicSubscription removed = durableSubscribers.remove(key);
210            if (removed != null) {
211                destinationStatistics.getConsumers().decrement();
212                // deactivate and remove
213                removed.deactivate(false, 0l);
214                consumers.remove(removed);
215            }
216        }
217    }
218
219    private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException {
220        if (hasSelectorChanged(info1, info2)) {
221            return true;
222        }
223
224        return hasNoLocalChanged(info1, info2);
225    }
226
227    private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException {
228        //Not all persistence adapters store the noLocal value for a subscription
229        PersistenceAdapter adapter = broker.getBrokerService().getPersistenceAdapter();
230        if (adapter instanceof NoLocalSubscriptionAware) {
231            if (info1.isNoLocal() ^ info2.isNoLocal()) {
232                return true;
233            }
234        }
235
236        return false;
237    }
238
239    private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) {
240        if (info1.getSelector() != null ^ info2.getSelector() != null) {
241            return true;
242        }
243
244        if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
245            return true;
246        }
247
248        return false;
249    }
250
251    public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
252        // synchronize with dispatch method so that no new messages are sent
253        // while we are recovering a subscription to avoid out of order messages.
254        dispatchLock.writeLock().lock();
255        try {
256
257            if (topicStore == null) {
258                return;
259            }
260
261            // Recover the durable subscription.
262            String clientId = subscription.getSubscriptionKey().getClientId();
263            String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
264            SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
265            if (info != null) {
266                // Check to see if selector changed.
267                if (hasDurableSubChanged(info, subscription.getConsumerInfo())) {
268                    // Need to delete the subscription
269                    topicStore.deleteSubscription(clientId, subscriptionName);
270                    info = null;
271                    // Force a rebuild of the selector chain for the subscription otherwise
272                    // the stored subscription is updated but the selector expression is not
273                    // and the subscription will not behave according to the new configuration.
274                    subscription.setSelector(subscription.getConsumerInfo().getSelector());
275                    synchronized (consumers) {
276                        consumers.remove(subscription);
277                    }
278                } else {
279                    synchronized (consumers) {
280                        if (!consumers.contains(subscription)) {
281                            consumers.add(subscription);
282                        }
283                    }
284                }
285            }
286
287            // Do we need to create the subscription?
288            if (info == null) {
289                info = new SubscriptionInfo();
290                info.setClientId(clientId);
291                info.setSelector(subscription.getConsumerInfo().getSelector());
292                info.setSubscriptionName(subscriptionName);
293                info.setDestination(getActiveMQDestination());
294                info.setNoLocal(subscription.getConsumerInfo().isNoLocal());
295                // This destination is an actual destination id.
296                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
297                // This destination might be a pattern
298                synchronized (consumers) {
299                    consumers.add(subscription);
300                    topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive());
301                }
302            }
303
304            final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
305            msgContext.setDestination(destination);
306            if (subscription.isRecoveryRequired()) {
307                topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
308                    @Override
309                    public boolean recoverMessage(Message message) throws Exception {
310                        message.setRegionDestination(Topic.this);
311                        try {
312                            msgContext.setMessageReference(message);
313                            if (subscription.matches(message, msgContext)) {
314                                subscription.add(message);
315                            }
316                        } catch (IOException e) {
317                            LOG.error("Failed to recover this message {}", message, e);
318                        }
319                        return true;
320                    }
321
322                    @Override
323                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
324                        throw new RuntimeException("Should not be called.");
325                    }
326
327                    @Override
328                    public boolean hasSpace() {
329                        return true;
330                    }
331
332                    @Override
333                    public boolean isDuplicate(MessageId id) {
334                        return false;
335                    }
336                });
337            }
338        } finally {
339            dispatchLock.writeLock().unlock();
340        }
341    }
342
343    public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
344        synchronized (consumers) {
345            consumers.remove(sub);
346        }
347        sub.remove(context, this, dispatched);
348    }
349
350    public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
351        if (subscription.getConsumerInfo().isRetroactive()) {
352            subscriptionRecoveryPolicy.recover(context, this, subscription);
353        }
354    }
355
356    @Override
357    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
358        final ConnectionContext context = producerExchange.getConnectionContext();
359
360        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
361        producerExchange.incrementSend();
362        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
363                && !context.isInRecoveryMode();
364
365        message.setRegionDestination(this);
366
367        // There is delay between the client sending it and it arriving at the
368        // destination.. it may have expired.
369        if (message.isExpired()) {
370            broker.messageExpired(context, message, null);
371            getDestinationStatistics().getExpired().increment();
372            if (sendProducerAck) {
373                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
374                context.getConnection().dispatchAsync(ack);
375            }
376            return;
377        }
378
379        if (memoryUsage.isFull()) {
380            isFull(context, memoryUsage);
381            fastProducer(context, producerInfo);
382
383            if (isProducerFlowControl() && context.isProducerFlowControl()) {
384
385                if (isFlowControlLogRequired()) {
386                    LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
387                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
388                }
389
390                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
391                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
392                            + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
393                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
394                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
395                }
396
397                // We can avoid blocking due to low usage if the producer is sending a sync message or
398                // if it is using a producer window
399                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
400                    synchronized (messagesWaitingForSpace) {
401                        messagesWaitingForSpace.add(new Runnable() {
402                            @Override
403                            public void run() {
404                                try {
405
406                                    // While waiting for space to free up... the
407                                    // message may have expired.
408                                    if (message.isExpired()) {
409                                        broker.messageExpired(context, message, null);
410                                        getDestinationStatistics().getExpired().increment();
411                                    } else {
412                                        doMessageSend(producerExchange, message);
413                                    }
414
415                                    if (sendProducerAck) {
416                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
417                                                .getSize());
418                                        context.getConnection().dispatchAsync(ack);
419                                    } else {
420                                        Response response = new Response();
421                                        response.setCorrelationId(message.getCommandId());
422                                        context.getConnection().dispatchAsync(response);
423                                    }
424
425                                } catch (Exception e) {
426                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
427                                        ExceptionResponse response = new ExceptionResponse(e);
428                                        response.setCorrelationId(message.getCommandId());
429                                        context.getConnection().dispatchAsync(response);
430                                    }
431                                }
432                            }
433                        });
434
435                        registerCallbackForNotFullNotification();
436                        context.setDontSendReponse(true);
437                        return;
438                    }
439
440                } else {
441                    // Producer flow control cannot be used, so we have do the flow control
442                    // at the broker by blocking this thread until there is space available.
443
444                    if (memoryUsage.isFull()) {
445                        if (context.isInTransaction()) {
446
447                            int count = 0;
448                            while (!memoryUsage.waitForSpace(1000)) {
449                                if (context.getStopping().get()) {
450                                    throw new IOException("Connection closed, send aborted.");
451                                }
452                                if (count > 2 && context.isInTransaction()) {
453                                    count = 0;
454                                    int size = context.getTransaction().size();
455                                    LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message);
456                                }
457                                count++;
458                            }
459                        } else {
460                            waitForSpace(
461                                    context,
462                                    producerExchange,
463                                    memoryUsage,
464                                    "Usage Manager Memory Usage limit reached. Stopping producer ("
465                                            + message.getProducerId()
466                                            + ") to prevent flooding "
467                                            + getActiveMQDestination().getQualifiedName()
468                                            + "."
469                                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
470                        }
471                    }
472
473                    // The usage manager could have delayed us by the time
474                    // we unblock the message could have expired..
475                    if (message.isExpired()) {
476                        getDestinationStatistics().getExpired().increment();
477                        LOG.debug("Expired message: {}", message);
478                        return;
479                    }
480                }
481            }
482        }
483
484        doMessageSend(producerExchange, message);
485        messageDelivered(context, message);
486        if (sendProducerAck) {
487            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
488            context.getConnection().dispatchAsync(ack);
489        }
490    }
491
492    /**
493     * do send the message - this needs to be synchronized to ensure messages
494     * are stored AND dispatched in the right order
495     *
496     * @param producerExchange
497     * @param message
498     * @throws IOException
499     * @throws Exception
500     */
501    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
502            throws IOException, Exception {
503        final ConnectionContext context = producerExchange.getConnectionContext();
504        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
505        Future<Object> result = null;
506
507        if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
508            if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
509                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
510                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
511                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
512                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
513                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
514                    throw new javax.jms.ResourceAllocationException(logMessage);
515                }
516
517                waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
518            }
519            result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
520
521            //Moved the reduceMemoryfootprint clearing to the dispatch method
522        }
523
524        message.incrementReferenceCount();
525
526        if (context.isInTransaction()) {
527            context.getTransaction().addSynchronization(new Synchronization() {
528                @Override
529                public void afterCommit() throws Exception {
530                    // It could take while before we receive the commit
531                    // operation.. by that time the message could have
532                    // expired..
533                    if (message.isExpired()) {
534                        if (broker.isExpired(message)) {
535                            getDestinationStatistics().getExpired().increment();
536                            broker.messageExpired(context, message, null);
537                        }
538                        message.decrementReferenceCount();
539                        return;
540                    }
541                    try {
542                        dispatch(context, message);
543                    } finally {
544                        message.decrementReferenceCount();
545                    }
546                }
547
548                @Override
549                public void afterRollback() throws Exception {
550                    message.decrementReferenceCount();
551                }
552            });
553
554        } else {
555            try {
556                dispatch(context, message);
557            } finally {
558                message.decrementReferenceCount();
559            }
560        }
561
562        if (result != null && !result.isCancelled()) {
563            try {
564                result.get();
565            } catch (CancellationException e) {
566                // ignore - the task has been cancelled if the message
567                // has already been deleted
568            }
569        }
570    }
571
572    private boolean canOptimizeOutPersistence() {
573        return durableSubscribers.size() == 0;
574    }
575
576    @Override
577    public String toString() {
578        return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
579    }
580
581    @Override
582    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
583            final MessageReference node) throws IOException {
584        if (topicStore != null && node.isPersistent()) {
585            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
586            SubscriptionKey key = dsub.getSubscriptionKey();
587            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
588                    convertToNonRangedAck(ack, node));
589        }
590        messageConsumed(context, node);
591    }
592
593    @Override
594    public void gc() {
595    }
596
597    public Message loadMessage(MessageId messageId) throws IOException {
598        return topicStore != null ? topicStore.getMessage(messageId) : null;
599    }
600
601    @Override
602    public void start() throws Exception {
603        if (started.compareAndSet(false, true)) {
604            this.subscriptionRecoveryPolicy.start();
605            if (memoryUsage != null) {
606                memoryUsage.start();
607            }
608
609            if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) {
610                scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
611            }
612        }
613    }
614
615    @Override
616    public void stop() throws Exception {
617        if (started.compareAndSet(true, false)) {
618            if (taskRunner != null) {
619                taskRunner.shutdown();
620            }
621            this.subscriptionRecoveryPolicy.stop();
622            if (memoryUsage != null) {
623                memoryUsage.stop();
624            }
625            if (this.topicStore != null) {
626                this.topicStore.stop();
627            }
628
629            scheduler.cancel(expireMessagesTask);
630        }
631    }
632
633    @Override
634    public Message[] browse() {
635        final List<Message> result = new ArrayList<Message>();
636        doBrowse(result, getMaxBrowsePageSize());
637        return result.toArray(new Message[result.size()]);
638    }
639
640    private void doBrowse(final List<Message> browseList, final int max) {
641        try {
642            if (topicStore != null) {
643                final List<Message> toExpire = new ArrayList<Message>();
644                topicStore.recover(new MessageRecoveryListener() {
645                    @Override
646                    public boolean recoverMessage(Message message) throws Exception {
647                        if (message.isExpired()) {
648                            toExpire.add(message);
649                        }
650                        browseList.add(message);
651                        return true;
652                    }
653
654                    @Override
655                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
656                        return true;
657                    }
658
659                    @Override
660                    public boolean hasSpace() {
661                        return browseList.size() < max;
662                    }
663
664                    @Override
665                    public boolean isDuplicate(MessageId id) {
666                        return false;
667                    }
668                });
669                final ConnectionContext connectionContext = createConnectionContext();
670                for (Message message : toExpire) {
671                    for (DurableTopicSubscription sub : durableSubscribers.values()) {
672                        if (!sub.isActive()) {
673                            message.setRegionDestination(this);
674                            messageExpired(connectionContext, sub, message);
675                        }
676                    }
677                }
678                Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
679                if (msgs != null) {
680                    for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
681                        browseList.add(msgs[i]);
682                    }
683                }
684            }
685        } catch (Throwable e) {
686            LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e);
687        }
688    }
689
690    @Override
691    public boolean iterate() {
692        synchronized (messagesWaitingForSpace) {
693            while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
694                Runnable op = messagesWaitingForSpace.removeFirst();
695                op.run();
696            }
697
698            if (!messagesWaitingForSpace.isEmpty()) {
699                registerCallbackForNotFullNotification();
700            }
701        }
702        return false;
703    }
704
705    private void registerCallbackForNotFullNotification() {
706        // If the usage manager is not full, then the task will not
707        // get called..
708        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
709            // so call it directly here.
710            sendMessagesWaitingForSpaceTask.run();
711        }
712    }
713
714    // Properties
715    // -------------------------------------------------------------------------
716
717    public DispatchPolicy getDispatchPolicy() {
718        return dispatchPolicy;
719    }
720
721    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
722        this.dispatchPolicy = dispatchPolicy;
723    }
724
725    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
726        return subscriptionRecoveryPolicy;
727    }
728
729    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
730        if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
731            // allow users to combine retained message policy with other ActiveMQ policies
732            RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
733            policy.setWrapped(recoveryPolicy);
734        } else {
735            this.subscriptionRecoveryPolicy = recoveryPolicy;
736        }
737    }
738
739    // Implementation methods
740    // -------------------------------------------------------------------------
741
742    @Override
743    public final void wakeup() {
744    }
745
746    protected void dispatch(final ConnectionContext context, Message message) throws Exception {
747        // AMQ-2586: Better to leave this stat at zero than to give the user
748        // misleading metrics.
749        // destinationStatistics.getMessages().increment();
750        destinationStatistics.getEnqueues().increment();
751        destinationStatistics.getMessageSize().addSize(message.getSize());
752        MessageEvaluationContext msgContext = null;
753
754        dispatchLock.readLock().lock();
755        try {
756            if (!subscriptionRecoveryPolicy.add(context, message)) {
757                return;
758            }
759            synchronized (consumers) {
760                if (consumers.isEmpty()) {
761                    onMessageWithNoConsumers(context, message);
762                    return;
763                }
764            }
765
766            // Clear memory before dispatch - need to clear here because the call to
767            //subscriptionRecoveryPolicy.add() will unmarshall the state
768            if (isReduceMemoryFootprint() && message.isMarshalled()) {
769                message.clearUnMarshalledState();
770            }
771
772            msgContext = context.getMessageEvaluationContext();
773            msgContext.setDestination(destination);
774            msgContext.setMessageReference(message);
775            if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
776                onMessageWithNoConsumers(context, message);
777            }
778
779        } finally {
780            dispatchLock.readLock().unlock();
781            if (msgContext != null) {
782                msgContext.clear();
783            }
784        }
785    }
786
787    private final Runnable expireMessagesTask = new Runnable() {
788        @Override
789        public void run() {
790            List<Message> browsedMessages = new InsertionCountList<Message>();
791            doBrowse(browsedMessages, getMaxExpirePageSize());
792        }
793    };
794
795    @Override
796    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
797        broker.messageExpired(context, reference, subs);
798        // AMQ-2586: Better to leave this stat at zero than to give the user
799        // misleading metrics.
800        // destinationStatistics.getMessages().decrement();
801        destinationStatistics.getExpired().increment();
802        MessageAck ack = new MessageAck();
803        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
804        ack.setDestination(destination);
805        ack.setMessageID(reference.getMessageId());
806        try {
807            if (subs instanceof DurableTopicSubscription) {
808                ((DurableTopicSubscription)subs).removePending(reference);
809            }
810            acknowledge(context, subs, ack, reference);
811        } catch (Exception e) {
812            LOG.error("Failed to remove expired Message from the store ", e);
813        }
814    }
815
816    @Override
817    protected Logger getLog() {
818        return LOG;
819    }
820
821    protected boolean isOptimizeStorage(){
822        boolean result = false;
823
824        if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
825                result = true;
826                for (DurableTopicSubscription s : durableSubscribers.values()) {
827                    if (s.isActive()== false){
828                        result = false;
829                        break;
830                    }
831                    if (s.getPrefetchSize()==0){
832                        result = false;
833                        break;
834                    }
835                    if (s.isSlowConsumer()){
836                        result = false;
837                        break;
838                    }
839                    if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
840                        result = false;
841                        break;
842                    }
843                }
844        }
845        return result;
846    }
847
848    /**
849     * force a reread of the store - after transaction recovery completion
850     */
851    @Override
852    public void clearPendingMessages() {
853        dispatchLock.readLock().lock();
854        try {
855            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
856                clearPendingAndDispatch(durableTopicSubscription);
857            }
858        } finally {
859            dispatchLock.readLock().unlock();
860        }
861    }
862
863    private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
864        synchronized (durableTopicSubscription.pendingLock) {
865            durableTopicSubscription.pending.clear();
866            try {
867                durableTopicSubscription.dispatchPending();
868            } catch (IOException exception) {
869                LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{
870                        durableTopicSubscription,
871                        destination,
872                        durableTopicSubscription.pending }, exception);
873            }
874        }
875    }
876
877    public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
878        return durableSubscribers;
879    }
880}