001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.advisory; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.Collections; 022import java.util.Iterator; 023import java.util.LinkedHashMap; 024import java.util.Map; 025import java.util.Set; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.locks.ReentrantReadWriteLock; 029 030import org.apache.activemq.broker.Broker; 031import org.apache.activemq.broker.BrokerFilter; 032import org.apache.activemq.broker.BrokerService; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.ProducerBrokerExchange; 035import org.apache.activemq.broker.TransportConnector; 036import org.apache.activemq.broker.region.BaseDestination; 037import org.apache.activemq.broker.region.Destination; 038import org.apache.activemq.broker.region.DurableTopicSubscription; 039import org.apache.activemq.broker.region.MessageReference; 040import org.apache.activemq.broker.region.RegionBroker; 041import org.apache.activemq.broker.region.Subscription; 042import org.apache.activemq.broker.region.TopicRegion; 043import org.apache.activemq.broker.region.TopicSubscription; 044import org.apache.activemq.broker.region.virtual.VirtualDestination; 045import org.apache.activemq.command.ActiveMQDestination; 046import org.apache.activemq.command.ActiveMQMessage; 047import org.apache.activemq.command.ActiveMQTopic; 048import org.apache.activemq.command.BrokerInfo; 049import org.apache.activemq.command.Command; 050import org.apache.activemq.command.ConnectionId; 051import org.apache.activemq.command.ConnectionInfo; 052import org.apache.activemq.command.ConsumerId; 053import org.apache.activemq.command.ConsumerInfo; 054import org.apache.activemq.command.DestinationInfo; 055import org.apache.activemq.command.Message; 056import org.apache.activemq.command.MessageId; 057import org.apache.activemq.command.ProducerId; 058import org.apache.activemq.command.ProducerInfo; 059import org.apache.activemq.command.RemoveSubscriptionInfo; 060import org.apache.activemq.command.SessionId; 061import org.apache.activemq.security.SecurityContext; 062import org.apache.activemq.state.ProducerState; 063import org.apache.activemq.usage.Usage; 064import org.apache.activemq.util.IdGenerator; 065import org.apache.activemq.util.LongSequenceGenerator; 066import org.apache.activemq.util.SubscriptionKey; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070/** 071 * This broker filter handles tracking the state of the broker for purposes of 072 * publishing advisory messages to advisory consumers. 073 */ 074public class AdvisoryBroker extends BrokerFilter { 075 076 private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class); 077 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 078 079 protected final ConcurrentMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>(); 080 081 private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); 082 protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>(); 083 084 /** 085 * This is a set to track all of the virtual destinations that have been added to the broker so 086 * they can be easily referenced later. 087 */ 088 protected final Set<VirtualDestination> virtualDestinations = Collections.newSetFromMap(new ConcurrentHashMap<VirtualDestination, Boolean>()); 089 /** 090 * This is a map to track all consumers that exist on the virtual destination so that we can fire 091 * an advisory later when they go away to remove the demand. 092 */ 093 protected final ConcurrentMap<ConsumerInfo, VirtualDestination> virtualDestinationConsumers = new ConcurrentHashMap<>(); 094 /** 095 * This is a map to track unique demand for the existence of a virtual destination so we make sure 096 * we don't send duplicate advisories. 097 */ 098 protected final ConcurrentMap<VirtualConsumerPair, ConsumerInfo> brokerConsumerDests = new ConcurrentHashMap<>(); 099 100 protected final ConcurrentMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>(); 101 protected final ConcurrentMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>(); 102 protected final ConcurrentMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>(); 103 protected final ProducerId advisoryProducerId = new ProducerId(); 104 105 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 106 107 private VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher(); 108 109 public AdvisoryBroker(Broker next) { 110 super(next); 111 advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); 112 } 113 114 @Override 115 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 116 super.addConnection(context, info); 117 118 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 119 // do not distribute passwords in advisory messages. usernames okay 120 ConnectionInfo copy = info.copy(); 121 copy.setPassword(""); 122 fireAdvisory(context, topic, copy); 123 connections.put(copy.getConnectionId(), copy); 124 } 125 126 @Override 127 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 128 Subscription answer = super.addConsumer(context, info); 129 130 // Don't advise advisory topics. 131 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 132 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); 133 consumersLock.writeLock().lock(); 134 try { 135 consumers.put(info.getConsumerId(), info); 136 137 //check if this is a consumer on a destination that matches a virtual destination 138 if (getBrokerService().isUseVirtualDestSubs()) { 139 for (VirtualDestination virtualDestination : virtualDestinations) { 140 if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) { 141 fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination); 142 } 143 } 144 } 145 } finally { 146 consumersLock.writeLock().unlock(); 147 } 148 fireConsumerAdvisory(context, info.getDestination(), topic, info); 149 } else { 150 // We need to replay all the previously collected state objects 151 // for this newly added consumer. 152 if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) { 153 // Replay the connections. 154 for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext(); ) { 155 ConnectionInfo value = iter.next(); 156 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 157 fireAdvisory(context, topic, value, info.getConsumerId()); 158 } 159 } 160 161 // We check here whether the Destination is Temporary Destination specific or not since we 162 // can avoid sending advisory messages to the consumer if it only wants Temporary Destination 163 // notifications. If its not just temporary destination related destinations then we have 164 // to send them all, a composite destination could want both. 165 if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) { 166 // Replay the temporary destinations. 167 for (DestinationInfo destination : destinations.values()) { 168 if (destination.getDestination().isTemporary()) { 169 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination()); 170 fireAdvisory(context, topic, destination, info.getConsumerId()); 171 } 172 } 173 } else if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) { 174 // Replay all the destinations. 175 for (DestinationInfo destination : destinations.values()) { 176 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination()); 177 fireAdvisory(context, topic, destination, info.getConsumerId()); 178 } 179 } 180 181 // Replay the producers. 182 if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) { 183 for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext(); ) { 184 ProducerInfo value = iter.next(); 185 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination()); 186 fireProducerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId()); 187 } 188 } 189 190 // Replay the consumers. 191 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { 192 consumersLock.readLock().lock(); 193 try { 194 for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) { 195 ConsumerInfo value = iter.next(); 196 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); 197 fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId()); 198 } 199 } finally { 200 consumersLock.readLock().unlock(); 201 } 202 } 203 204 // Replay the virtual destination consumers. 205 if (AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) { 206 for (Iterator<ConsumerInfo> iter = virtualDestinationConsumers.keySet().iterator(); iter.hasNext(); ) { 207 ConsumerInfo key = iter.next(); 208 ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(key.getDestination()); 209 fireConsumerAdvisory(context, key.getDestination(), topic, key); 210 } 211 } 212 213 // Replay network bridges 214 if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) { 215 for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext(); ) { 216 BrokerInfo key = iter.next(); 217 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 218 fireAdvisory(context, topic, key, null, networkBridges.get(key)); 219 } 220 } 221 } 222 return answer; 223 } 224 225 @Override 226 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 227 super.addProducer(context, info); 228 229 // Don't advise advisory topics. 230 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 231 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); 232 fireProducerAdvisory(context, info.getDestination(), topic, info); 233 producers.put(info.getProducerId(), info); 234 } 235 } 236 237 @Override 238 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception { 239 Destination answer = super.addDestination(context, destination, create); 240 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 241 //for queues, create demand if isUseVirtualDestSubsOnCreation is true 242 if (getBrokerService().isUseVirtualDestSubsOnCreation() && destination.isQueue()) { 243 //check if this new destination matches a virtual destination that exists 244 for (VirtualDestination virtualDestination : virtualDestinations) { 245 if (virtualDestinationMatcher.matches(virtualDestination, destination)) { 246 fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination); 247 } 248 } 249 } 250 251 DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); 252 DestinationInfo previous = destinations.putIfAbsent(destination, info); 253 if (previous == null) { 254 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 255 fireAdvisory(context, topic, info); 256 } 257 } 258 return answer; 259 } 260 261 @Override 262 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 263 ActiveMQDestination destination = info.getDestination(); 264 next.addDestinationInfo(context, info); 265 266 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 267 DestinationInfo previous = destinations.putIfAbsent(destination, info); 268 if (previous == null) { 269 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 270 fireAdvisory(context, topic, info); 271 } 272 } 273 } 274 275 @Override 276 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 277 super.removeDestination(context, destination, timeout); 278 DestinationInfo info = destinations.remove(destination); 279 if (info != null) { 280 281 //on destination removal, remove all demand if using virtual dest subs 282 if (getBrokerService().isUseVirtualDestSubs()) { 283 for (ConsumerInfo consumerInfo : virtualDestinationConsumers.keySet()) { 284 //find all consumers for this virtual destination 285 VirtualDestination virtualDestination = virtualDestinationConsumers.get(consumerInfo); 286 287 //find a consumer that matches this virtualDest and destination 288 if (virtualDestinationMatcher.matches(virtualDestination, destination)) { 289 //in case of multiple matches 290 VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination); 291 ConsumerInfo i = brokerConsumerDests.get(key); 292 if (consumerInfo.equals(i) && brokerConsumerDests.remove(key) != null) { 293 LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", key, i); 294 fireVirtualDestinationRemoveAdvisory(context, consumerInfo); 295 break; 296 } 297 } 298 } 299 } 300 301 // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate 302 info = info.copy(); 303 info.setDestination(destination); 304 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 305 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 306 fireAdvisory(context, topic, info); 307 ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination); 308 for (ActiveMQTopic advisoryDestination : advisoryDestinations) { 309 try { 310 next.removeDestination(context, advisoryDestination, -1); 311 } catch (Exception expectedIfDestinationDidNotExistYet) { 312 } 313 } 314 } 315 } 316 317 @Override 318 public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception { 319 super.removeDestinationInfo(context, destInfo); 320 DestinationInfo info = destinations.remove(destInfo.getDestination()); 321 if (info != null) { 322 // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate 323 info = info.copy(); 324 info.setDestination(destInfo.getDestination()); 325 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 326 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination()); 327 fireAdvisory(context, topic, info); 328 ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destInfo.getDestination()); 329 for (ActiveMQTopic advisoryDestination : advisoryDestinations) { 330 try { 331 next.removeDestination(context, advisoryDestination, -1); 332 } catch (Exception expectedIfDestinationDidNotExistYet) { 333 } 334 } 335 } 336 } 337 338 @Override 339 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 340 super.removeConnection(context, info, error); 341 342 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 343 fireAdvisory(context, topic, info.createRemoveCommand()); 344 connections.remove(info.getConnectionId()); 345 } 346 347 @Override 348 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 349 super.removeConsumer(context, info); 350 351 // Don't advise advisory topics. 352 ActiveMQDestination dest = info.getDestination(); 353 if (!AdvisorySupport.isAdvisoryTopic(dest)) { 354 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); 355 consumersLock.writeLock().lock(); 356 try { 357 consumers.remove(info.getConsumerId()); 358 359 //remove the demand for this consumer if it matches a virtual destination 360 if(getBrokerService().isUseVirtualDestSubs()) { 361 fireVirtualDestinationRemoveAdvisory(context, info); 362 } 363 } finally { 364 consumersLock.writeLock().unlock(); 365 } 366 if (!dest.isTemporary() || destinations.containsKey(dest)) { 367 fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand()); 368 } 369 } 370 } 371 372 @Override 373 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 374 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 375 376 RegionBroker regionBroker = null; 377 if (next instanceof RegionBroker) { 378 regionBroker = (RegionBroker) next; 379 } else { 380 BrokerService service = next.getBrokerService(); 381 regionBroker = (RegionBroker) service.getRegionBroker(); 382 } 383 384 if (regionBroker == null) { 385 LOG.warn("Cannot locate a RegionBroker instance to pass along the removeSubscription call"); 386 throw new IllegalStateException("No RegionBroker found."); 387 } 388 389 DurableTopicSubscription sub = ((TopicRegion) regionBroker.getTopicRegion()).getDurableSubscription(key); 390 391 super.removeSubscription(context, info); 392 393 if (sub == null) { 394 LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub"); 395 return; 396 } 397 398 ActiveMQDestination dest = sub.getConsumerInfo().getDestination(); 399 400 // Don't advise advisory topics. 401 if (!AdvisorySupport.isAdvisoryTopic(dest)) { 402 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); 403 fireConsumerAdvisory(context, dest, topic, info); 404 } 405 406 } 407 408 @Override 409 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 410 super.removeProducer(context, info); 411 412 // Don't advise advisory topics. 413 ActiveMQDestination dest = info.getDestination(); 414 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) { 415 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest); 416 producers.remove(info.getProducerId()); 417 if (!dest.isTemporary() || destinations.containsKey(dest)) { 418 fireProducerAdvisory(context, dest, topic, info.createRemoveCommand()); 419 } 420 } 421 } 422 423 @Override 424 public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) { 425 super.messageExpired(context, messageReference, subscription); 426 try { 427 if (!messageReference.isAdvisory()) { 428 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 429 ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(baseDestination.getActiveMQDestination()); 430 Message payload = messageReference.getMessage().copy(); 431 if (!baseDestination.isIncludeBodyForAdvisory()) { 432 payload.clearBody(); 433 } 434 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 435 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 436 fireAdvisory(context, topic, payload, null, advisoryMessage); 437 } 438 } catch (Exception e) { 439 handleFireFailure("expired", e); 440 } 441 } 442 443 @Override 444 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 445 super.messageConsumed(context, messageReference); 446 try { 447 if (!messageReference.isAdvisory()) { 448 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 449 ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(baseDestination.getActiveMQDestination()); 450 Message payload = messageReference.getMessage().copy(); 451 if (!baseDestination.isIncludeBodyForAdvisory()) { 452 payload.clearBody(); 453 } 454 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 455 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 456 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName()); 457 fireAdvisory(context, topic, payload, null, advisoryMessage); 458 } 459 } catch (Exception e) { 460 handleFireFailure("consumed", e); 461 } 462 } 463 464 @Override 465 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 466 super.messageDelivered(context, messageReference); 467 try { 468 if (!messageReference.isAdvisory()) { 469 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 470 ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(baseDestination.getActiveMQDestination()); 471 Message payload = messageReference.getMessage().copy(); 472 if (!baseDestination.isIncludeBodyForAdvisory()) { 473 payload.clearBody(); 474 } 475 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 476 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 477 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName()); 478 fireAdvisory(context, topic, payload, null, advisoryMessage); 479 } 480 } catch (Exception e) { 481 handleFireFailure("delivered", e); 482 } 483 } 484 485 @Override 486 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 487 super.messageDiscarded(context, sub, messageReference); 488 try { 489 if (!messageReference.isAdvisory()) { 490 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 491 ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(baseDestination.getActiveMQDestination()); 492 Message payload = messageReference.getMessage().copy(); 493 if (!baseDestination.isIncludeBodyForAdvisory()) { 494 payload.clearBody(); 495 } 496 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 497 if (sub instanceof TopicSubscription) { 498 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded()); 499 } 500 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 501 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString()); 502 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName()); 503 504 fireAdvisory(context, topic, payload, null, advisoryMessage); 505 } 506 } catch (Exception e) { 507 handleFireFailure("discarded", e); 508 } 509 } 510 511 @Override 512 public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) { 513 super.slowConsumer(context, destination, subs); 514 try { 515 if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { 516 ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination()); 517 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 518 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString()); 519 fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage); 520 } 521 } catch (Exception e) { 522 handleFireFailure("slow consumer", e); 523 } 524 } 525 526 @Override 527 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo, ActiveMQDestination destination) { 528 super.fastProducer(context, producerInfo, destination); 529 try { 530 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 531 ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(destination); 532 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 533 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString()); 534 fireAdvisory(context, topic, producerInfo, null, advisoryMessage); 535 } 536 } catch (Exception e) { 537 handleFireFailure("fast producer", e); 538 } 539 } 540 541 private final IdGenerator connectionIdGenerator = new IdGenerator("advisory"); 542 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); 543 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 544 545 @Override 546 public void virtualDestinationAdded(ConnectionContext context, 547 VirtualDestination virtualDestination) { 548 super.virtualDestinationAdded(context, virtualDestination); 549 550 if (virtualDestinations.add(virtualDestination)) { 551 LOG.debug("Virtual destination added: {}", virtualDestination); 552 try { 553 // Don't advise advisory topics. 554 if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) { 555 556 //create demand for consumers on virtual destinations 557 consumersLock.readLock().lock(); 558 try { 559 //loop through existing destinations to see if any match this newly 560 //created virtual destination 561 if (getBrokerService().isUseVirtualDestSubsOnCreation()) { 562 //for matches that are a queue, fire an advisory for demand 563 for (ActiveMQDestination destination : destinations.keySet()) { 564 if(destination.isQueue()) { 565 if (virtualDestinationMatcher.matches(virtualDestination, destination)) { 566 fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination); 567 } 568 } 569 } 570 } 571 572 //loop through existing consumers to see if any of them are consuming on a destination 573 //that matches the new virtual destination 574 for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) { 575 ConsumerInfo info = iter.next(); 576 if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) { 577 fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination); 578 } 579 } 580 } finally { 581 consumersLock.readLock().unlock(); 582 } 583 } 584 } catch (Exception e) { 585 handleFireFailure("virtualDestinationAdded", e); 586 } 587 } 588 } 589 590 private void fireVirtualDestinationAddAdvisory(ConnectionContext context, ConsumerInfo info, ActiveMQDestination activeMQDest, 591 VirtualDestination virtualDestination) throws Exception { 592 //if no consumer info, we need to create one - this is the case when an advisory is fired 593 //because of the existence of a destination matching a virtual destination 594 if (info == null) { 595 596 //store the virtual destination and the activeMQDestination as a pair so that we can keep track 597 //of all matching forwarded destinations that caused demand 598 VirtualConsumerPair pair = new VirtualConsumerPair(virtualDestination, activeMQDest); 599 if (brokerConsumerDests.get(pair) == null) { 600 ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId()); 601 SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId()); 602 ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 603 info = new ConsumerInfo(consumerId); 604 605 if(brokerConsumerDests.putIfAbsent(pair, info) == null) { 606 LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info); 607 info.setDestination(virtualDestination.getVirtualDestination()); 608 ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); 609 610 if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { 611 LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination); 612 fireConsumerAdvisory(context, info.getDestination(), topic, info); 613 } 614 } 615 } 616 //this is the case of a real consumer coming online 617 } else { 618 info = info.copy(); 619 info.setDestination(virtualDestination.getVirtualDestination()); 620 ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); 621 622 if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { 623 LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination); 624 fireConsumerAdvisory(context, info.getDestination(), topic, info); 625 } 626 } 627 } 628 629 @Override 630 public void virtualDestinationRemoved(ConnectionContext context, 631 VirtualDestination virtualDestination) { 632 super.virtualDestinationRemoved(context, virtualDestination); 633 634 if (virtualDestinations.remove(virtualDestination)) { 635 LOG.debug("Virtual destination removed: {}", virtualDestination); 636 try { 637 consumersLock.readLock().lock(); 638 try { 639 // remove the demand created by the addition of the virtual destination 640 if (getBrokerService().isUseVirtualDestSubsOnCreation()) { 641 if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) { 642 for (ConsumerInfo info : virtualDestinationConsumers.keySet()) { 643 //find all consumers for this virtual destination 644 if (virtualDestinationConsumers.get(info).equals(virtualDestination)) { 645 fireVirtualDestinationRemoveAdvisory(context, info); 646 647 //check consumers created for the existence of a destination to see if they 648 //match the consumerinfo and clean up 649 for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) { 650 ConsumerInfo i = brokerConsumerDests.get(activeMQDest); 651 if (info.equals(i) && brokerConsumerDests.remove(activeMQDest) != null) { 652 LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", activeMQDest, i); 653 } 654 } 655 } 656 657 } 658 } 659 } 660 } finally { 661 consumersLock.readLock().unlock(); 662 } 663 } catch (Exception e) { 664 handleFireFailure("virtualDestinationAdded", e); 665 } 666 } 667 } 668 669 private void fireVirtualDestinationRemoveAdvisory(ConnectionContext context, 670 ConsumerInfo info) throws Exception { 671 672 VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info); 673 if (virtualDestination != null) { 674 LOG.debug("Virtual consumer removed: {}, for virtual destination: {}", info, virtualDestination); 675 ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination()); 676 677 ActiveMQDestination dest = info.getDestination(); 678 679 if (!dest.isTemporary() || destinations.containsKey(dest)) { 680 fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand()); 681 } 682 } 683 } 684 685 @Override 686 public void isFull(ConnectionContext context, Destination destination, Usage<?> usage) { 687 super.isFull(context, destination, usage); 688 if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) { 689 try { 690 691 ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination()); 692 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 693 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName()); 694 advisoryMessage.setLongProperty(AdvisorySupport.MSG_PROPERTY_USAGE_COUNT, usage.getUsage()); 695 fireAdvisory(context, topic, null, null, advisoryMessage); 696 697 } catch (Exception e) { 698 handleFireFailure("is full", e); 699 } 700 } 701 } 702 703 @Override 704 public void nowMasterBroker() { 705 super.nowMasterBroker(); 706 try { 707 ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic(); 708 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 709 ConnectionContext context = new ConnectionContext(); 710 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 711 context.setBroker(getBrokerService().getBroker()); 712 fireAdvisory(context, topic, null, null, advisoryMessage); 713 } catch (Exception e) { 714 handleFireFailure("now master broker", e); 715 } 716 } 717 718 @Override 719 public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, 720 Subscription subscription, Throwable poisonCause) { 721 boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); 722 if (wasDLQd) { 723 try { 724 if (!messageReference.isAdvisory()) { 725 BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination(); 726 ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(baseDestination.getActiveMQDestination()); 727 Message payload = messageReference.getMessage().copy(); 728 if (!baseDestination.isIncludeBodyForAdvisory()) { 729 payload.clearBody(); 730 } 731 fireAdvisory(context, topic, payload); 732 } 733 } catch (Exception e) { 734 handleFireFailure("add to DLQ", e); 735 } 736 } 737 738 return wasDLQd; 739 } 740 741 @Override 742 public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { 743 try { 744 if (brokerInfo != null) { 745 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 746 advisoryMessage.setBooleanProperty("started", true); 747 advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex); 748 advisoryMessage.setStringProperty("remoteIp", remoteIp); 749 networkBridges.putIfAbsent(brokerInfo, advisoryMessage); 750 751 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 752 753 ConnectionContext context = new ConnectionContext(); 754 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 755 context.setBroker(getBrokerService().getBroker()); 756 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); 757 } 758 } catch (Exception e) { 759 handleFireFailure("network bridge started", e); 760 } 761 } 762 763 @Override 764 public void networkBridgeStopped(BrokerInfo brokerInfo) { 765 try { 766 if (brokerInfo != null) { 767 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 768 advisoryMessage.setBooleanProperty("started", false); 769 networkBridges.remove(brokerInfo); 770 771 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 772 773 ConnectionContext context = new ConnectionContext(); 774 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 775 context.setBroker(getBrokerService().getBroker()); 776 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); 777 } 778 } catch (Exception e) { 779 handleFireFailure("network bridge stopped", e); 780 } 781 } 782 783 private void handleFireFailure(String message, Throwable cause) { 784 LOG.warn("Failed to fire {} advisory, reason: {}", message, cause); 785 LOG.debug("{} detail: {}", message, cause, cause); 786 } 787 788 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { 789 fireAdvisory(context, topic, command, null); 790 } 791 792 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 793 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 794 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 795 } 796 797 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command) throws Exception { 798 fireConsumerAdvisory(context, consumerDestination, topic, command, null); 799 } 800 801 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 802 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 803 int count = 0; 804 Set<Destination> set = getDestinations(consumerDestination); 805 if (set != null) { 806 for (Destination dest : set) { 807 count += dest.getDestinationStatistics().getConsumers().getCount(); 808 } 809 } 810 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count); 811 812 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 813 } 814 815 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception { 816 fireProducerAdvisory(context, producerDestination, topic, command, null); 817 } 818 819 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 820 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 821 int count = 0; 822 if (producerDestination != null) { 823 Set<Destination> set = getDestinations(producerDestination); 824 if (set != null) { 825 for (Destination dest : set) { 826 count += dest.getDestinationStatistics().getProducers().getCount(); 827 } 828 } 829 } 830 advisoryMessage.setIntProperty("producerCount", count); 831 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 832 } 833 834 public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception { 835 //set properties 836 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); 837 String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; 838 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); 839 840 String url = getBrokerService().getVmConnectorURI().toString(); 841 //try and find the URL on the transport connector and use if it exists else 842 //try and find a default URL 843 if (context.getConnector() instanceof TransportConnector 844 && ((TransportConnector) context.getConnector()).getPublishableConnectString() != null) { 845 url = ((TransportConnector) context.getConnector()).getPublishableConnectString(); 846 } else if (getBrokerService().getDefaultSocketURIString() != null) { 847 url = getBrokerService().getDefaultSocketURIString(); 848 } 849 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); 850 851 //set the data structure 852 advisoryMessage.setDataStructure(command); 853 advisoryMessage.setPersistent(false); 854 advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 855 advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId())); 856 advisoryMessage.setTargetConsumerId(targetConsumerId); 857 advisoryMessage.setDestination(topic); 858 advisoryMessage.setResponseRequired(false); 859 advisoryMessage.setProducerId(advisoryProducerId); 860 boolean originalFlowControl = context.isProducerFlowControl(); 861 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 862 producerExchange.setConnectionContext(context); 863 producerExchange.setMutable(true); 864 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 865 try { 866 context.setProducerFlowControl(false); 867 next.send(producerExchange, advisoryMessage); 868 } finally { 869 context.setProducerFlowControl(originalFlowControl); 870 } 871 } 872 873 public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() { 874 return connections; 875 } 876 877 public Collection<ConsumerInfo> getAdvisoryConsumers() { 878 consumersLock.readLock().lock(); 879 try { 880 return new ArrayList<ConsumerInfo>(consumers.values()); 881 } finally { 882 consumersLock.readLock().unlock(); 883 } 884 } 885 886 public Map<ProducerId, ProducerInfo> getAdvisoryProducers() { 887 return producers; 888 } 889 890 public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() { 891 return destinations; 892 } 893 894 public ConcurrentMap<ConsumerInfo, VirtualDestination> getVirtualDestinationConsumers() { 895 return virtualDestinationConsumers; 896 } 897 898 private class VirtualConsumerPair { 899 private final VirtualDestination virtualDestination; 900 901 //destination that matches this virtualDestination as part target 902 //this is so we can keep track of more than one destination that might 903 //match the virtualDestination and cause demand 904 private final ActiveMQDestination activeMQDestination; 905 906 public VirtualConsumerPair(VirtualDestination virtualDestination, 907 ActiveMQDestination activeMQDestination) { 908 super(); 909 this.virtualDestination = virtualDestination; 910 this.activeMQDestination = activeMQDestination; 911 } 912 913 @Override 914 public int hashCode() { 915 final int prime = 31; 916 int result = 1; 917 result = prime * result + getOuterType().hashCode(); 918 result = prime 919 * result 920 + ((activeMQDestination == null) ? 0 : activeMQDestination 921 .hashCode()); 922 result = prime 923 * result 924 + ((virtualDestination == null) ? 0 : virtualDestination 925 .hashCode()); 926 return result; 927 } 928 929 @Override 930 public boolean equals(Object obj) { 931 if (this == obj) 932 return true; 933 if (obj == null) 934 return false; 935 if (getClass() != obj.getClass()) 936 return false; 937 VirtualConsumerPair other = (VirtualConsumerPair) obj; 938 if (!getOuterType().equals(other.getOuterType())) 939 return false; 940 if (activeMQDestination == null) { 941 if (other.activeMQDestination != null) 942 return false; 943 } else if (!activeMQDestination.equals(other.activeMQDestination)) 944 return false; 945 if (virtualDestination == null) { 946 if (other.virtualDestination != null) 947 return false; 948 } else if (!virtualDestination.equals(other.virtualDestination)) 949 return false; 950 return true; 951 } 952 953 @Override 954 public String toString() { 955 return "VirtualConsumerPair [virtualDestination=" + virtualDestination + ", activeMQDestination=" 956 + activeMQDestination + "]"; 957 } 958 959 private AdvisoryBroker getOuterType() { 960 return AdvisoryBroker.this; 961 } 962 } 963}