001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.net.URI;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Locale;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.CopyOnWriteArrayList;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.locks.ReentrantReadWriteLock;
032
033import javax.jms.InvalidClientIDException;
034import javax.jms.JMSException;
035
036import org.apache.activemq.broker.Broker;
037import org.apache.activemq.broker.BrokerService;
038import org.apache.activemq.broker.Connection;
039import org.apache.activemq.broker.ConnectionContext;
040import org.apache.activemq.broker.ConsumerBrokerExchange;
041import org.apache.activemq.broker.EmptyBroker;
042import org.apache.activemq.broker.ProducerBrokerExchange;
043import org.apache.activemq.broker.TransportConnection;
044import org.apache.activemq.broker.TransportConnector;
045import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
046import org.apache.activemq.broker.region.policy.PolicyMap;
047import org.apache.activemq.command.ActiveMQDestination;
048import org.apache.activemq.command.ActiveMQMessage;
049import org.apache.activemq.command.BrokerId;
050import org.apache.activemq.command.BrokerInfo;
051import org.apache.activemq.command.ConnectionId;
052import org.apache.activemq.command.ConnectionInfo;
053import org.apache.activemq.command.ConsumerControl;
054import org.apache.activemq.command.ConsumerInfo;
055import org.apache.activemq.command.DestinationInfo;
056import org.apache.activemq.command.Message;
057import org.apache.activemq.command.MessageAck;
058import org.apache.activemq.command.MessageDispatch;
059import org.apache.activemq.command.MessageDispatchNotification;
060import org.apache.activemq.command.MessagePull;
061import org.apache.activemq.command.ProducerInfo;
062import org.apache.activemq.command.RemoveSubscriptionInfo;
063import org.apache.activemq.command.Response;
064import org.apache.activemq.command.TransactionId;
065import org.apache.activemq.state.ConnectionState;
066import org.apache.activemq.store.PListStore;
067import org.apache.activemq.thread.Scheduler;
068import org.apache.activemq.thread.TaskRunnerFactory;
069import org.apache.activemq.transport.TransmitCallback;
070import org.apache.activemq.usage.SystemUsage;
071import org.apache.activemq.util.BrokerSupport;
072import org.apache.activemq.util.IdGenerator;
073import org.apache.activemq.util.InetAddressUtil;
074import org.apache.activemq.util.LongSequenceGenerator;
075import org.apache.activemq.util.ServiceStopper;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079/**
080 * Routes Broker operations to the correct messaging regions for processing.
081 */
082public class RegionBroker extends EmptyBroker {
083    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
084    private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
085    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
086
087    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
088    protected DestinationFactory destinationFactory;
089    protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
090
091    private final Region queueRegion;
092    private final Region topicRegion;
093    private final Region tempQueueRegion;
094    private final Region tempTopicRegion;
095    protected final BrokerService brokerService;
096    private boolean started;
097    private boolean keepDurableSubsActive;
098
099    private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
100    private final Map<ActiveMQDestination, ActiveMQDestination> destinationGate = new HashMap<ActiveMQDestination, ActiveMQDestination>();
101    private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
102    private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
103
104    private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
105    private BrokerId brokerId;
106    private String brokerName;
107    private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
108    private final DestinationInterceptor destinationInterceptor;
109    private ConnectionContext adminConnectionContext;
110    private final Scheduler scheduler;
111    private final ThreadPoolExecutor executor;
112    private boolean allowTempAutoCreationOnSend;
113
114    private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
115    private final Runnable purgeInactiveDestinationsTask = new Runnable() {
116        @Override
117        public void run() {
118            purgeInactiveDestinations();
119        }
120    };
121
122    public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
123        DestinationInterceptor destinationInterceptor, Scheduler scheduler, ThreadPoolExecutor executor) throws IOException {
124        this.brokerService = brokerService;
125        this.executor = executor;
126        this.scheduler = scheduler;
127        if (destinationFactory == null) {
128            throw new IllegalArgumentException("null destinationFactory");
129        }
130        this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
131        this.destinationFactory = destinationFactory;
132        queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
133        topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
134        this.destinationInterceptor = destinationInterceptor;
135        tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
136        tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
137    }
138
139    @Override
140    public Map<ActiveMQDestination, Destination> getDestinationMap() {
141        Map<ActiveMQDestination, Destination> answer = new HashMap<ActiveMQDestination, Destination>(getQueueRegion().getDestinationMap());
142        answer.putAll(getTopicRegion().getDestinationMap());
143        return answer;
144    }
145
146    @Override
147    public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination) {
148        try {
149            return getRegion(destination).getDestinationMap();
150        } catch (JMSException jmse) {
151            return Collections.emptyMap();
152        }
153    }
154
155    @Override
156    public Set<Destination> getDestinations(ActiveMQDestination destination) {
157        try {
158            return getRegion(destination).getDestinations(destination);
159        } catch (JMSException jmse) {
160            return Collections.emptySet();
161        }
162    }
163
164    public Region getQueueRegion() {
165        return queueRegion;
166    }
167
168    public Region getTempQueueRegion() {
169        return tempQueueRegion;
170    }
171
172    public Region getTempTopicRegion() {
173        return tempTopicRegion;
174    }
175
176    public Region getTopicRegion() {
177        return topicRegion;
178    }
179
180    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
181        return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
182    }
183
184    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
185        return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
186    }
187
188    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
189        return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
190    }
191
192    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
193        return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
194    }
195
196    @Override
197    public void start() throws Exception {
198        started = true;
199        queueRegion.start();
200        topicRegion.start();
201        tempQueueRegion.start();
202        tempTopicRegion.start();
203        int period = this.brokerService.getSchedulePeriodForDestinationPurge();
204        if (period > 0) {
205            this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
206        }
207    }
208
209    @Override
210    public void stop() throws Exception {
211        started = false;
212        this.scheduler.cancel(purgeInactiveDestinationsTask);
213        ServiceStopper ss = new ServiceStopper();
214        doStop(ss);
215        ss.throwFirstException();
216        // clear the state
217        clientIdSet.clear();
218        connections.clear();
219        destinations.clear();
220        brokerInfos.clear();
221    }
222
223    public PolicyMap getDestinationPolicy() {
224        return brokerService != null ? brokerService.getDestinationPolicy() : null;
225    }
226
227    public ConnectionContext getConnectionContext(String clientId) {
228        return clientIdSet.get(clientId);
229    }
230
231    @Override
232    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
233        String clientId = info.getClientId();
234        if (clientId == null) {
235            throw new InvalidClientIDException("No clientID specified for connection request");
236        }
237
238        ConnectionContext oldContext = null;
239
240        synchronized (clientIdSet) {
241            oldContext = clientIdSet.get(clientId);
242            if (oldContext != null) {
243                if (context.isAllowLinkStealing()) {
244                    clientIdSet.put(clientId, context);
245                } else {
246                    throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
247                        + oldContext.getConnection().getRemoteAddress());
248                }
249            } else {
250                clientIdSet.put(clientId, context);
251            }
252        }
253
254        if (oldContext != null) {
255            if (oldContext.getConnection() != null) {
256                Connection connection = oldContext.getConnection();
257                LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
258                if (connection instanceof TransportConnection) {
259                    TransportConnection transportConnection = (TransportConnection) connection;
260                    transportConnection.stopAsync(new IOException("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection().getConnectionId()));
261                } else {
262                    connection.stop();
263                }
264            } else {
265                LOG.error("No Connection found for {}", oldContext);
266            }
267        }
268
269        connections.add(context.getConnection());
270    }
271
272    @Override
273    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
274        String clientId = info.getClientId();
275        if (clientId == null) {
276            throw new InvalidClientIDException("No clientID specified for connection disconnect request");
277        }
278        synchronized (clientIdSet) {
279            ConnectionContext oldValue = clientIdSet.get(clientId);
280            // we may be removing the duplicate connection, not the first connection to be created
281            // so lets check that their connection IDs are the same
282            if (oldValue == context) {
283                if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
284                    clientIdSet.remove(clientId);
285                }
286            }
287        }
288        connections.remove(context.getConnection());
289    }
290
291    protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
292        return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
293    }
294
295    @Override
296    public Connection[] getClients() throws Exception {
297        ArrayList<Connection> l = new ArrayList<Connection>(connections);
298        Connection rc[] = new Connection[l.size()];
299        l.toArray(rc);
300        return rc;
301    }
302
303    @Override
304    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception {
305
306        Destination answer;
307
308        answer = destinations.get(destination);
309        if (answer != null) {
310            return answer;
311        }
312
313        synchronized (destinationGate) {
314            answer = destinations.get(destination);
315            if (answer != null) {
316                return answer;
317            }
318
319            if (destinationGate.get(destination) != null) {
320                // Guard against spurious wakeup.
321                while (destinationGate.containsKey(destination)) {
322                    destinationGate.wait();
323                }
324                answer = destinations.get(destination);
325                if (answer != null) {
326                    return answer;
327                } else {
328                    // In case of intermediate remove or add failure
329                    destinationGate.put(destination, destination);
330                }
331            }
332        }
333
334        try {
335            boolean create = true;
336            if (destination.isTemporary()) {
337                create = createIfTemp;
338            }
339            answer = getRegion(destination).addDestination(context, destination, create);
340            destinations.put(destination, answer);
341        } finally {
342            synchronized (destinationGate) {
343                destinationGate.remove(destination);
344                destinationGate.notifyAll();
345            }
346        }
347
348        return answer;
349    }
350
351    @Override
352    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
353        if (destinations.containsKey(destination)) {
354            getRegion(destination).removeDestination(context, destination, timeout);
355            destinations.remove(destination);
356        }
357    }
358
359    @Override
360    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
361        addDestination(context, info.getDestination(), true);
362
363    }
364
365    @Override
366    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
367        removeDestination(context, info.getDestination(), info.getTimeout());
368    }
369
370    @Override
371    public ActiveMQDestination[] getDestinations() throws Exception {
372        ArrayList<ActiveMQDestination> l;
373
374        l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());
375
376        ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
377        l.toArray(rc);
378        return rc;
379    }
380
381    @Override
382    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
383        ActiveMQDestination destination = info.getDestination();
384        if (destination != null) {
385            inactiveDestinationsPurgeLock.readLock().lock();
386            try {
387                // This seems to cause the destination to be added but without
388                // advisories firing...
389                context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend());
390                getRegion(destination).addProducer(context, info);
391            } finally {
392                inactiveDestinationsPurgeLock.readLock().unlock();
393            }
394        }
395    }
396
397    @Override
398    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
399        ActiveMQDestination destination = info.getDestination();
400        if (destination != null) {
401            inactiveDestinationsPurgeLock.readLock().lock();
402            try {
403                getRegion(destination).removeProducer(context, info);
404            } finally {
405                inactiveDestinationsPurgeLock.readLock().unlock();
406            }
407        }
408    }
409
410    @Override
411    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
412        ActiveMQDestination destination = info.getDestination();
413        if (destinationInterceptor != null) {
414            destinationInterceptor.create(this, context, destination);
415        }
416        inactiveDestinationsPurgeLock.readLock().lock();
417        try {
418            return getRegion(destination).addConsumer(context, info);
419        } finally {
420            inactiveDestinationsPurgeLock.readLock().unlock();
421        }
422    }
423
424    @Override
425    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
426        ActiveMQDestination destination = info.getDestination();
427        inactiveDestinationsPurgeLock.readLock().lock();
428        try {
429            getRegion(destination).removeConsumer(context, info);
430        } finally {
431            inactiveDestinationsPurgeLock.readLock().unlock();
432        }
433    }
434
435    @Override
436    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
437        inactiveDestinationsPurgeLock.readLock().lock();
438        try {
439            topicRegion.removeSubscription(context, info);
440        } finally {
441            inactiveDestinationsPurgeLock.readLock().unlock();
442
443        }
444    }
445
446    @Override
447    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
448        ActiveMQDestination destination = message.getDestination();
449        message.setBrokerInTime(System.currentTimeMillis());
450        if (producerExchange.isMutable() || producerExchange.getRegion() == null
451            || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
452            // ensure the destination is registered with the RegionBroker
453            producerExchange.getConnectionContext().getBroker()
454                .addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
455            producerExchange.setRegion(getRegion(destination));
456            producerExchange.setRegionDestination(null);
457        }
458
459        producerExchange.getRegion().send(producerExchange, message);
460
461        // clean up so these references aren't kept (possible leak) in the producer exchange
462        // especially since temps are transitory
463        if (producerExchange.isMutable()) {
464            producerExchange.setRegionDestination(null);
465            producerExchange.setRegion(null);
466        }
467    }
468
469    @Override
470    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
471        if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
472            ActiveMQDestination destination = ack.getDestination();
473            consumerExchange.setRegion(getRegion(destination));
474        }
475        consumerExchange.getRegion().acknowledge(consumerExchange, ack);
476    }
477
478    public Region getRegion(ActiveMQDestination destination) throws JMSException {
479        switch (destination.getDestinationType()) {
480            case ActiveMQDestination.QUEUE_TYPE:
481                return queueRegion;
482            case ActiveMQDestination.TOPIC_TYPE:
483                return topicRegion;
484            case ActiveMQDestination.TEMP_QUEUE_TYPE:
485                return tempQueueRegion;
486            case ActiveMQDestination.TEMP_TOPIC_TYPE:
487                return tempTopicRegion;
488            default:
489                throw createUnknownDestinationTypeException(destination);
490        }
491    }
492
493    @Override
494    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
495        ActiveMQDestination destination = pull.getDestination();
496        return getRegion(destination).messagePull(context, pull);
497    }
498
499    @Override
500    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
501        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
502    }
503
504    @Override
505    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
506        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
507    }
508
509    @Override
510    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
511        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
512    }
513
514    @Override
515    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
516        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
517    }
518
519    @Override
520    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
521        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
522    }
523
524    @Override
525    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
526        throw new IllegalAccessException("Transaction operation not implemented by this broker.");
527    }
528
529    @Override
530    public void gc() {
531        queueRegion.gc();
532        topicRegion.gc();
533    }
534
535    @Override
536    public BrokerId getBrokerId() {
537        if (brokerId == null) {
538            brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
539        }
540        return brokerId;
541    }
542
543    public void setBrokerId(BrokerId brokerId) {
544        this.brokerId = brokerId;
545    }
546
547    @Override
548    public String getBrokerName() {
549        if (brokerName == null) {
550            try {
551                brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH);
552            } catch (Exception e) {
553                brokerName = "localhost";
554            }
555        }
556        return brokerName;
557    }
558
559    public void setBrokerName(String brokerName) {
560        this.brokerName = brokerName;
561    }
562
563    public DestinationStatistics getDestinationStatistics() {
564        return destinationStatistics;
565    }
566
567    protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
568        return new JMSException("Unknown destination type: " + destination.getDestinationType());
569    }
570
571    @Override
572    public synchronized void addBroker(Connection connection, BrokerInfo info) {
573        BrokerInfo existing = brokerInfos.get(info.getBrokerId());
574        if (existing == null) {
575            existing = info.copy();
576            existing.setPeerBrokerInfos(null);
577            brokerInfos.put(info.getBrokerId(), existing);
578        }
579        existing.incrementRefCount();
580        LOG.debug("{} addBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size() });
581        addBrokerInClusterUpdate(info);
582    }
583
584    @Override
585    public synchronized void removeBroker(Connection connection, BrokerInfo info) {
586        if (info != null) {
587            BrokerInfo existing = brokerInfos.get(info.getBrokerId());
588            if (existing != null && existing.decrementRefCount() == 0) {
589                brokerInfos.remove(info.getBrokerId());
590            }
591            LOG.debug("{} removeBroker: {} brokerInfo size: {}", new Object[]{ getBrokerName(), info.getBrokerName(), brokerInfos.size()});
592            // When stopping don't send cluster updates since we are the one's tearing down
593            // our own bridges.
594            if (!brokerService.isStopping()) {
595                removeBrokerInClusterUpdate(info);
596            }
597        }
598    }
599
600    @Override
601    public synchronized BrokerInfo[] getPeerBrokerInfos() {
602        BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
603        result = brokerInfos.values().toArray(result);
604        return result;
605    }
606
607    @Override
608    public void preProcessDispatch(final MessageDispatch messageDispatch) {
609        final Message message = messageDispatch.getMessage();
610        if (message != null) {
611            long endTime = System.currentTimeMillis();
612            message.setBrokerOutTime(endTime);
613            if (getBrokerService().isEnableStatistics()) {
614                long totalTime = endTime - message.getBrokerInTime();
615                ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
616            }
617            if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered()) {
618                final int originalValue = message.getRedeliveryCounter();
619                message.incrementRedeliveryCounter();
620                try {
621                    if (message.isPersistent()) {
622                        ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
623                    }
624                    messageDispatch.setTransmitCallback(new TransmitCallback() {
625                        // dispatch is considered a delivery, so update sub state post dispatch otherwise
626                        // on a disconnect/reconnect cached messages will not reflect initial delivery attempt
627                        final TransmitCallback delegate = messageDispatch.getTransmitCallback();
628                        @Override
629                        public void onSuccess() {
630                            message.incrementRedeliveryCounter();
631                            if (delegate != null) {
632                                delegate.onSuccess();
633                            }
634                        }
635
636                        @Override
637                        public void onFailure() {
638                            if (delegate != null) {
639                                delegate.onFailure();
640                            }
641                        }
642                    });
643                } catch (IOException error) {
644                    RuntimeException runtimeException = new RuntimeException("Failed to persist JMSRedeliveryFlag on " + message.getMessageId() + " in " + message.getDestination(), error);
645                    LOG.warn(runtimeException.getLocalizedMessage(), runtimeException);
646                    throw runtimeException;
647                } finally {
648                    message.setRedeliveryCounter(originalValue);
649                }
650            }
651        }
652    }
653
654    @Override
655    public void postProcessDispatch(MessageDispatch messageDispatch) {
656    }
657
658    @Override
659    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
660        ActiveMQDestination destination = messageDispatchNotification.getDestination();
661        getRegion(destination).processDispatchNotification(messageDispatchNotification);
662    }
663
664    @Override
665    public boolean isStopped() {
666        return !started;
667    }
668
669    @Override
670    public Set<ActiveMQDestination> getDurableDestinations() {
671        return destinationFactory.getDestinations();
672    }
673
674    protected void doStop(ServiceStopper ss) {
675        ss.stop(queueRegion);
676        ss.stop(topicRegion);
677        ss.stop(tempQueueRegion);
678        ss.stop(tempTopicRegion);
679    }
680
681    public boolean isKeepDurableSubsActive() {
682        return keepDurableSubsActive;
683    }
684
685    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
686        this.keepDurableSubsActive = keepDurableSubsActive;
687        ((TopicRegion) topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
688    }
689
690    public DestinationInterceptor getDestinationInterceptor() {
691        return destinationInterceptor;
692    }
693
694    @Override
695    public ConnectionContext getAdminConnectionContext() {
696        return adminConnectionContext;
697    }
698
699    @Override
700    public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
701        this.adminConnectionContext = adminConnectionContext;
702    }
703
704    public Map<ConnectionId, ConnectionState> getConnectionStates() {
705        return connectionStates;
706    }
707
708    @Override
709    public PListStore getTempDataStore() {
710        return brokerService.getTempDataStore();
711    }
712
713    @Override
714    public URI getVmConnectorURI() {
715        return brokerService.getVmConnectorURI();
716    }
717
718    @Override
719    public void brokerServiceStarted() {
720    }
721
722    @Override
723    public BrokerService getBrokerService() {
724        return brokerService;
725    }
726
727    @Override
728    public boolean isExpired(MessageReference messageReference) {
729        return messageReference.canProcessAsExpired();
730    }
731
732    private boolean stampAsExpired(Message message) throws IOException {
733        boolean stamped = false;
734        if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
735            long expiration = message.getExpiration();
736            message.setProperty(ORIGINAL_EXPIRATION, new Long(expiration));
737            stamped = true;
738        }
739        return stamped;
740    }
741
742    @Override
743    public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
744        LOG.debug("Message expired {}", node);
745        getRoot().sendToDeadLetterQueue(context, node, subscription, new Throwable("Message Expired. Expiration:" + node.getExpiration()));
746    }
747
748    @Override
749    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription, Throwable poisonCause) {
750        try {
751            if (node != null) {
752                Message message = node.getMessage();
753                if (message != null && node.getRegionDestination() != null) {
754                    DeadLetterStrategy deadLetterStrategy = ((Destination) node.getRegionDestination()).getDeadLetterStrategy();
755                    if (deadLetterStrategy != null) {
756                        if (deadLetterStrategy.isSendToDeadLetterQueue(message)) {
757                            ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription);
758                            // Prevent a DLQ loop where same message is sent from a DLQ back to itself
759                            if (deadLetterDestination.equals(message.getDestination())) {
760                                LOG.debug("Not re-adding to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
761                                return false;
762                            }
763
764                            // message may be inflight to other subscriptions so do not modify
765                            message = message.copy();
766                            long dlqExpiration = deadLetterStrategy.getExpiration();
767                            if (dlqExpiration > 0) {
768                                dlqExpiration += System.currentTimeMillis();
769                            } else {
770                                stampAsExpired(message);
771                            }
772                            message.setExpiration(dlqExpiration);
773                            if (!message.isPersistent()) {
774                                message.setPersistent(true);
775                                message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
776                            }
777                            if (poisonCause != null) {
778                                message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
779                                        poisonCause.toString());
780                            }
781                            // The original destination and transaction id do
782                            // not get filled when the message is first sent,
783                            // it is only populated if the message is routed to
784                            // another destination like the DLQ
785                            ConnectionContext adminContext = context;
786                            if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) {
787                                adminContext = BrokerSupport.getConnectionContext(this);
788                            }
789                            addDestination(adminContext, deadLetterDestination, false).getActiveMQDestination().setDLQ(true);
790                            BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination);
791                            return true;
792                        }
793                    } else {
794                        LOG.debug("Dead Letter message with no DLQ strategy in place, message id: {}, destination: {}", message.getMessageId(), message.getDestination());
795                    }
796                }
797            }
798        } catch (Exception e) {
799            LOG.warn("Caught an exception sending to DLQ: {}", node, e);
800        }
801
802        return false;
803    }
804
805    @Override
806    public Broker getRoot() {
807        try {
808            return getBrokerService().getBroker();
809        } catch (Exception e) {
810            LOG.error("Trying to get Root Broker", e);
811            throw new RuntimeException("The broker from the BrokerService should not throw an exception");
812        }
813    }
814
815    /**
816     * @return the broker sequence id
817     */
818    @Override
819    public long getBrokerSequenceId() {
820        synchronized (sequenceGenerator) {
821            return sequenceGenerator.getNextSequenceId();
822        }
823    }
824
825    @Override
826    public Scheduler getScheduler() {
827        return this.scheduler;
828    }
829
830    @Override
831    public ThreadPoolExecutor getExecutor() {
832        return this.executor;
833    }
834
835    @Override
836    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
837        ActiveMQDestination destination = control.getDestination();
838        try {
839            getRegion(destination).processConsumerControl(consumerExchange, control);
840        } catch (JMSException jmse) {
841            LOG.warn("unmatched destination: {}, in consumerControl: {}", destination, control);
842        }
843    }
844
845    protected void addBrokerInClusterUpdate(BrokerInfo info) {
846        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
847        for (TransportConnector connector : connectors) {
848            if (connector.isUpdateClusterClients()) {
849                connector.addPeerBroker(info);
850                connector.updateClientClusterInfo();
851            }
852        }
853    }
854
855    protected void removeBrokerInClusterUpdate(BrokerInfo info) {
856        List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
857        for (TransportConnector connector : connectors) {
858            if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) {
859                connector.removePeerBroker(info);
860                connector.updateClientClusterInfo();
861            }
862        }
863    }
864
865    protected void purgeInactiveDestinations() {
866        inactiveDestinationsPurgeLock.writeLock().lock();
867        try {
868            List<Destination> list = new ArrayList<Destination>();
869            Map<ActiveMQDestination, Destination> map = getDestinationMap();
870            if (isAllowTempAutoCreationOnSend()) {
871                map.putAll(tempQueueRegion.getDestinationMap());
872                map.putAll(tempTopicRegion.getDestinationMap());
873            }
874            long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep();
875            long timeStamp = System.currentTimeMillis();
876            for (Destination d : map.values()) {
877                d.markForGC(timeStamp);
878                if (d.canGC()) {
879                    list.add(d);
880                    if (maxPurgedDests > 0 && list.size() == maxPurgedDests) {
881                        break;
882                    }
883                }
884            }
885
886            if (!list.isEmpty()) {
887                ConnectionContext context = BrokerSupport.getConnectionContext(this);
888                context.setBroker(this);
889
890                for (Destination dest : list) {
891                    Logger log = LOG;
892                    if (dest instanceof BaseDestination) {
893                        log = ((BaseDestination) dest).getLog();
894                    }
895                    log.info("{} Inactive for longer than {} ms - removing ...", dest.getName(), dest.getInactiveTimeoutBeforeGC());
896                    try {
897                        getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
898                    } catch (Exception e) {
899                        LOG.error("Failed to remove inactive destination {}", dest, e);
900                    }
901                }
902            }
903        } finally {
904            inactiveDestinationsPurgeLock.writeLock().unlock();
905        }
906    }
907
908    public boolean isAllowTempAutoCreationOnSend() {
909        return allowTempAutoCreationOnSend;
910    }
911
912    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
913        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
914    }
915
916    @Override
917    public void reapplyInterceptor() {
918        queueRegion.reapplyInterceptor();
919        topicRegion.reapplyInterceptor();
920        tempQueueRegion.reapplyInterceptor();
921        tempTopicRegion.reapplyInterceptor();
922    }
923}