001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.util.Arrays;
020import java.util.List;
021
022import org.apache.activemq.broker.region.Destination;
023import org.apache.activemq.broker.region.Subscription;
024import org.apache.activemq.command.BrokerId;
025import org.apache.activemq.command.ConsumerInfo;
026import org.apache.activemq.command.Message;
027import org.apache.activemq.command.NetworkBridgeFilter;
028import org.apache.activemq.filter.MessageEvaluationContext;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * implement conditional behavior for queue consumers, allows replaying back to
034 * origin if no consumers are present on the local broker after a configurable
035 * delay, irrespective of the TTL. Also allows rate limiting of messages
036 * through the network, useful for static includes
037 *
038 * @org.apache.xbean.XBean
039 */
040public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
041
042    boolean replayWhenNoConsumers = false;
043    int replayDelay = 0;
044    int rateLimit = 0;
045    int rateDuration = 1000;
046
047    @Override
048    public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int messageTTL, int consumerTTL) {
049        ConditionalNetworkBridgeFilter filter = new ConditionalNetworkBridgeFilter();
050        filter.setNetworkBrokerId(remoteBrokerPath[0]);
051        filter.setMessageTTL(messageTTL);
052        filter.setConsumerTTL(consumerTTL);
053        filter.setAllowReplayWhenNoConsumers(isReplayWhenNoConsumers());
054        filter.setRateLimit(getRateLimit());
055        filter.setRateDuration(getRateDuration());
056        filter.setReplayDelay(getReplayDelay());
057        return filter;
058    }
059
060    public void setReplayWhenNoConsumers(boolean replayWhenNoConsumers) {
061        this.replayWhenNoConsumers = replayWhenNoConsumers;
062    }
063
064    public boolean isReplayWhenNoConsumers() {
065        return replayWhenNoConsumers;
066    }
067
068    public void setRateLimit(int rateLimit) {
069        this.rateLimit = rateLimit;
070    }
071
072    public int getRateLimit() {
073        return rateLimit;
074    }
075
076    public int getRateDuration() {
077        return rateDuration;
078    }
079
080    public void setRateDuration(int rateDuration) {
081        this.rateDuration = rateDuration;
082    }
083
084    public int getReplayDelay() {
085        return replayDelay;
086    }
087
088    public void setReplayDelay(int replayDelay) {
089        this.replayDelay = replayDelay;
090    }
091
092    private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter {
093        final static Logger LOG = LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class);
094        private int rateLimit;
095        private int rateDuration = 1000;
096        private boolean allowReplayWhenNoConsumers = true;
097        private int replayDelay = 1000;
098
099        private int matchCount;
100        private long rateDurationEnd;
101
102        @Override
103        protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
104            boolean match = true;
105            if (mec.getDestination().isQueue() && contains(message.getBrokerPath(), networkBrokerId)) {
106                // potential replay back to origin
107                match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
108
109                if (match) {
110                    LOG.trace("Replaying [{}] for [{}] back to origin in the absence of a local consumer", message.getMessageId(), message.getDestination());
111                } else {
112                    LOG.trace("Suppressing replay of [{}] for [{}] back to origin {}", new Object[]{ message.getMessageId(), message.getDestination(), Arrays.asList(message.getBrokerPath())} );
113                }
114
115            } else {
116                // use existing filter logic for topics and non replays
117                match = super.matchesForwardingFilter(message, mec);
118            }
119
120            if (match && rateLimitExceeded()) {
121                LOG.trace("Throttled network consumer rejecting [{}] for [{}] {}>{}/{}", new Object[]{
122                        message.getMessageId(), message.getDestination(), matchCount, rateLimit, rateDuration
123                });
124                match = false;
125            }
126
127            return match;
128        }
129
130        private boolean hasNotJustArrived(Message message) {
131            return replayDelay == 0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis());
132        }
133
134        private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) {
135            Destination regionDestination = (Destination) mec.getMessageReference().getRegionDestination();
136            List<Subscription> consumers = regionDestination.getConsumers();
137            for (Subscription sub : consumers) {
138                if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
139                    LOG.trace("Not replaying [{}] for [{}] to origin due to existing local consumer: {}", new Object[]{
140                            message.getMessageId(), message.getDestination(), sub.getConsumerInfo()
141                    });
142                    return false;
143                }
144            }
145            return true;
146        }
147
148        private boolean rateLimitExceeded() {
149            if (rateLimit == 0) {
150                return false;
151            }
152
153            if (rateDurationEnd < System.currentTimeMillis()) {
154                rateDurationEnd = System.currentTimeMillis() + rateDuration;
155                matchCount = 0;
156            }
157            return ++matchCount > rateLimit;
158        }
159
160        public void setReplayDelay(int replayDelay) {
161            this.replayDelay = replayDelay;
162        }
163
164        public void setRateLimit(int rateLimit) {
165            this.rateLimit = rateLimit;
166        }
167
168        public void setRateDuration(int rateDuration) {
169            this.rateDuration = rateDuration;
170        }
171
172        public void setAllowReplayWhenNoConsumers(boolean allowReplayWhenNoConsumers) {
173            this.allowReplayWhenNoConsumers = allowReplayWhenNoConsumers;
174        }
175    }
176}