001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.policy; 018 019import java.util.Set; 020 021import org.apache.activemq.ActiveMQPrefetchPolicy; 022import org.apache.activemq.broker.Broker; 023import org.apache.activemq.broker.region.BaseDestination; 024import org.apache.activemq.broker.region.Destination; 025import org.apache.activemq.broker.region.DurableTopicSubscription; 026import org.apache.activemq.broker.region.Queue; 027import org.apache.activemq.broker.region.QueueBrowserSubscription; 028import org.apache.activemq.broker.region.QueueSubscription; 029import org.apache.activemq.broker.region.Subscription; 030import org.apache.activemq.broker.region.Topic; 031import org.apache.activemq.broker.region.TopicSubscription; 032import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 033import org.apache.activemq.broker.region.group.GroupFactoryFinder; 034import org.apache.activemq.broker.region.group.MessageGroupMapFactory; 035import org.apache.activemq.filter.DestinationMapEntry; 036import org.apache.activemq.network.NetworkBridgeFilterFactory; 037import org.apache.activemq.usage.SystemUsage; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Represents an entry in a {@link PolicyMap} for assigning policies to a 043 * specific destination or a hierarchical wildcard area of destinations. 044 * 045 * @org.apache.xbean.XBean 046 * 047 */ 048public class PolicyEntry extends DestinationMapEntry { 049 050 private static final Logger LOG = LoggerFactory.getLogger(PolicyEntry.class); 051 private DispatchPolicy dispatchPolicy; 052 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 053 private boolean sendAdvisoryIfNoConsumers; 054 private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY; 055 private PendingMessageLimitStrategy pendingMessageLimitStrategy; 056 private MessageEvictionStrategy messageEvictionStrategy; 057 private long memoryLimit; 058 private String messageGroupMapFactoryType = "cached"; 059 private MessageGroupMapFactory messageGroupMapFactory; 060 private PendingQueueMessageStoragePolicy pendingQueuePolicy; 061 private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy; 062 private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy; 063 private int maxProducersToAudit=BaseDestination.MAX_PRODUCERS_TO_AUDIT; 064 private int maxAuditDepth=BaseDestination.MAX_AUDIT_DEPTH; 065 private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH; 066 private boolean enableAudit=true; 067 private boolean producerFlowControl = true; 068 private boolean alwaysRetroactive = false; 069 private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; 070 private boolean optimizedDispatch=false; 071 private int maxPageSize=BaseDestination.MAX_PAGE_SIZE; 072 private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE; 073 private boolean useCache=true; 074 private long minimumMessageSize=1024; 075 private boolean useConsumerPriority=true; 076 private boolean strictOrderDispatch=false; 077 private boolean lazyDispatch=false; 078 private int timeBeforeDispatchStarts = 0; 079 private int consumersBeforeDispatchStarts = 0; 080 private boolean advisoryForSlowConsumers; 081 private boolean advisoryForFastProducers; 082 private boolean advisoryForDiscardingMessages; 083 private boolean advisoryWhenFull; 084 private boolean advisoryForDelivery; 085 private boolean advisoryForConsumed; 086 private boolean includeBodyForAdvisory; 087 private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD; 088 private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE; 089 private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; 090 private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH; 091 private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH; 092 private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; 093 private boolean usePrefetchExtension = true; 094 private int cursorMemoryHighWaterMark = 70; 095 private int storeUsageHighWaterMark = 100; 096 private SlowConsumerStrategy slowConsumerStrategy; 097 private boolean prioritizedMessages; 098 private boolean allConsumersExclusiveByDefault; 099 private boolean gcInactiveDestinations; 100 private boolean gcWithNetworkConsumers; 101 private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; 102 private boolean reduceMemoryFootprint; 103 private NetworkBridgeFilterFactory networkBridgeFilterFactory; 104 private boolean doOptimzeMessageStorage = true; 105 private int maxDestinations = -1; 106 107 /* 108 * percentage of in-flight messages above which optimize message store is disabled 109 */ 110 private int optimizeMessageStoreInFlightLimit = 10; 111 private boolean persistJMSRedelivered = false; 112 113 114 public void configure(Broker broker,Queue queue) { 115 baseConfiguration(broker,queue); 116 if (dispatchPolicy != null) { 117 queue.setDispatchPolicy(dispatchPolicy); 118 } 119 queue.setDeadLetterStrategy(getDeadLetterStrategy()); 120 queue.setMessageGroupMapFactory(getMessageGroupMapFactory()); 121 if (memoryLimit > 0) { 122 queue.getMemoryUsage().setLimit(memoryLimit); 123 } 124 if (pendingQueuePolicy != null) { 125 PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue); 126 queue.setMessages(messages); 127 } 128 129 queue.setUseConsumerPriority(isUseConsumerPriority()); 130 queue.setStrictOrderDispatch(isStrictOrderDispatch()); 131 queue.setOptimizedDispatch(isOptimizedDispatch()); 132 queue.setLazyDispatch(isLazyDispatch()); 133 queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); 134 queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); 135 queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); 136 queue.setPersistJMSRedelivered(isPersistJMSRedelivered()); 137 } 138 139 public void update(Queue queue) { 140 update(queue, null); 141 } 142 143 /** 144 * Update a queue with this policy. Only apply properties that 145 * match the includedProperties list. Not all properties are eligible 146 * to be updated. 147 * 148 * If includedProperties is null then all of the properties will be set as 149 * isUpdate will return true 150 * @param baseDestination 151 * @param includedProperties 152 */ 153 public void update(Queue queue, Set<String> includedProperties) { 154 baseUpdate(queue, includedProperties); 155 if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) { 156 queue.getMemoryUsage().setLimit(memoryLimit); 157 } 158 if (isUpdate("useConsumerPriority", includedProperties)) { 159 queue.setUseConsumerPriority(isUseConsumerPriority()); 160 } 161 if (isUpdate("strictOrderDispatch", includedProperties)) { 162 queue.setStrictOrderDispatch(isStrictOrderDispatch()); 163 } 164 if (isUpdate("optimizedDispatch", includedProperties)) { 165 queue.setOptimizedDispatch(isOptimizedDispatch()); 166 } 167 if (isUpdate("lazyDispatch", includedProperties)) { 168 queue.setLazyDispatch(isLazyDispatch()); 169 } 170 if (isUpdate("timeBeforeDispatchStarts", includedProperties)) { 171 queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); 172 } 173 if (isUpdate("consumersBeforeDispatchStarts", includedProperties)) { 174 queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); 175 } 176 if (isUpdate("allConsumersExclusiveByDefault", includedProperties)) { 177 queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); 178 } 179 if (isUpdate("persistJMSRedelivered", includedProperties)) { 180 queue.setPersistJMSRedelivered(isPersistJMSRedelivered()); 181 } 182 } 183 184 public void configure(Broker broker,Topic topic) { 185 baseConfiguration(broker,topic); 186 if (dispatchPolicy != null) { 187 topic.setDispatchPolicy(dispatchPolicy); 188 } 189 topic.setDeadLetterStrategy(getDeadLetterStrategy()); 190 if (subscriptionRecoveryPolicy != null) { 191 SubscriptionRecoveryPolicy srp = subscriptionRecoveryPolicy.copy(); 192 srp.setBroker(broker); 193 topic.setSubscriptionRecoveryPolicy(srp); 194 } 195 if (memoryLimit > 0) { 196 topic.getMemoryUsage().setLimit(memoryLimit); 197 } 198 topic.setLazyDispatch(isLazyDispatch()); 199 } 200 201 public void update(Topic topic) { 202 update(topic, null); 203 } 204 205 //If includedProperties is null then all of the properties will be set as 206 //isUpdate will return true 207 public void update(Topic topic, Set<String> includedProperties) { 208 baseUpdate(topic, includedProperties); 209 if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) { 210 topic.getMemoryUsage().setLimit(memoryLimit); 211 } 212 if (isUpdate("lazyDispatch", includedProperties)) { 213 topic.setLazyDispatch(isLazyDispatch()); 214 } 215 } 216 217 // attributes that can change on the fly 218 public void baseUpdate(BaseDestination destination) { 219 baseUpdate(destination, null); 220 } 221 222 // attributes that can change on the fly 223 //If includedProperties is null then all of the properties will be set as 224 //isUpdate will return true 225 public void baseUpdate(BaseDestination destination, Set<String> includedProperties) { 226 if (isUpdate("producerFlowControl", includedProperties)) { 227 destination.setProducerFlowControl(isProducerFlowControl()); 228 } 229 if (isUpdate("alwaysRetroactive", includedProperties)) { 230 destination.setAlwaysRetroactive(isAlwaysRetroactive()); 231 } 232 if (isUpdate("blockedProducerWarningInterval", includedProperties)) { 233 destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); 234 } 235 if (isUpdate("maxPageSize", includedProperties)) { 236 destination.setMaxPageSize(getMaxPageSize()); 237 } 238 if (isUpdate("maxBrowsePageSize", includedProperties)) { 239 destination.setMaxBrowsePageSize(getMaxBrowsePageSize()); 240 } 241 242 if (isUpdate("minimumMessageSize", includedProperties)) { 243 destination.setMinimumMessageSize((int) getMinimumMessageSize()); 244 } 245 if (isUpdate("maxExpirePageSize", includedProperties)) { 246 destination.setMaxExpirePageSize(getMaxExpirePageSize()); 247 } 248 if (isUpdate("cursorMemoryHighWaterMark", includedProperties)) { 249 destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 250 } 251 if (isUpdate("storeUsageHighWaterMark", includedProperties)) { 252 destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); 253 } 254 if (isUpdate("gcInactiveDestinations", includedProperties)) { 255 destination.setGcIfInactive(isGcInactiveDestinations()); 256 } 257 if (isUpdate("gcWithNetworkConsumers", includedProperties)) { 258 destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers()); 259 } 260 if (isUpdate("inactiveTimeoutBeforeGc", includedProperties)) { 261 destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC()); 262 } 263 if (isUpdate("reduceMemoryFootprint", includedProperties)) { 264 destination.setReduceMemoryFootprint(isReduceMemoryFootprint()); 265 } 266 if (isUpdate("doOptimizeMessageStore", includedProperties)) { 267 destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage()); 268 } 269 if (isUpdate("optimizeMessageStoreInFlightLimit", includedProperties)) { 270 destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit()); 271 } 272 if (isUpdate("advisoryForConsumed", includedProperties)) { 273 destination.setAdvisoryForConsumed(isAdvisoryForConsumed()); 274 } 275 if (isUpdate("advisoryForDelivery", includedProperties)) { 276 destination.setAdvisoryForDelivery(isAdvisoryForDelivery()); 277 } 278 if (isUpdate("advisoryForDiscardingMessages", includedProperties)) { 279 destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages()); 280 } 281 if (isUpdate("advisoryForSlowConsumers", includedProperties)) { 282 destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers()); 283 } 284 if (isUpdate("advisoryForFastProducers", includedProperties)) { 285 destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); 286 } 287 if (isUpdate("advisoryWhenFull", includedProperties)) { 288 destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); 289 } 290 if (isUpdate("includeBodyForAdvisory", includedProperties)) { 291 destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory()); 292 } 293 if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) { 294 destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); 295 } 296 } 297 298 public void baseConfiguration(Broker broker, BaseDestination destination) { 299 baseUpdate(destination); 300 destination.setEnableAudit(isEnableAudit()); 301 destination.setMaxAuditDepth(getMaxQueueAuditDepth()); 302 destination.setMaxProducersToAudit(getMaxProducersToAudit()); 303 destination.setUseCache(isUseCache()); 304 destination.setExpireMessagesPeriod(getExpireMessagesPeriod()); 305 SlowConsumerStrategy scs = getSlowConsumerStrategy(); 306 if (scs != null) { 307 scs.setBrokerService(broker); 308 scs.addDestination(destination); 309 } 310 destination.setSlowConsumerStrategy(scs); 311 destination.setPrioritizedMessages(isPrioritizedMessages()); 312 } 313 314 public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { 315 configurePrefetch(subscription); 316 subscription.setUsePrefetchExtension(isUsePrefetchExtension()); 317 subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 318 if (pendingMessageLimitStrategy != null) { 319 int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription); 320 int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit(); 321 if (consumerLimit > 0) { 322 if (value < 0 || consumerLimit < value) { 323 value = consumerLimit; 324 } 325 } 326 if (value >= 0) { 327 LOG.debug("Setting the maximumPendingMessages size to: {} for consumer: {}", value, subscription.getInfo().getConsumerId()); 328 subscription.setMaximumPendingMessages(value); 329 } 330 } 331 if (messageEvictionStrategy != null) { 332 subscription.setMessageEvictionStrategy(messageEvictionStrategy); 333 } 334 if (pendingSubscriberPolicy != null) { 335 String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId(); 336 int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize(); 337 subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription)); 338 } 339 if (enableAudit) { 340 subscription.setEnableAudit(enableAudit); 341 subscription.setMaxProducersToAudit(maxProducersToAudit); 342 subscription.setMaxAuditDepth(maxAuditDepth); 343 } 344 } 345 346 public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) { 347 String clientId = sub.getSubscriptionKey().getClientId(); 348 String subName = sub.getSubscriptionKey().getSubscriptionName(); 349 sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 350 configurePrefetch(sub); 351 if (pendingDurableSubscriberPolicy != null) { 352 PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,sub.getPrefetchSize(),sub); 353 cursor.setSystemUsage(memoryManager); 354 sub.setPending(cursor); 355 } 356 int auditDepth = getMaxAuditDepth(); 357 if (auditDepth == BaseDestination.MAX_AUDIT_DEPTH && this.isPrioritizedMessages()) { 358 sub.setMaxAuditDepth(auditDepth * 10); 359 } else { 360 sub.setMaxAuditDepth(auditDepth); 361 } 362 sub.setMaxProducersToAudit(getMaxProducersToAudit()); 363 sub.setUsePrefetchExtension(isUsePrefetchExtension()); 364 } 365 366 public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) { 367 configurePrefetch(sub); 368 sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 369 sub.setUsePrefetchExtension(isUsePrefetchExtension()); 370 371 // TODO 372 // We currently need an infinite audit because of the way that browser dispatch 373 // is done. We should refactor the browsers to better handle message dispatch so 374 // we can remove this and perform a more efficient dispatch. 375 sub.setMaxProducersToAudit(Integer.MAX_VALUE); 376 sub.setMaxAuditDepth(Short.MAX_VALUE); 377 378 // part solution - dispatching to browsers needs to be restricted 379 sub.setMaxMessages(getMaxBrowsePageSize()); 380 } 381 382 public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) { 383 configurePrefetch(sub); 384 sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 385 sub.setUsePrefetchExtension(isUsePrefetchExtension()); 386 sub.setMaxProducersToAudit(getMaxProducersToAudit()); 387 } 388 389 public void configurePrefetch(Subscription subscription) { 390 391 final int currentPrefetch = subscription.getConsumerInfo().getPrefetchSize(); 392 if (subscription instanceof QueueBrowserSubscription) { 393 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH) { 394 ((QueueBrowserSubscription) subscription).setPrefetchSize(getQueueBrowserPrefetch()); 395 } 396 } else if (subscription instanceof QueueSubscription) { 397 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH) { 398 ((QueueSubscription) subscription).setPrefetchSize(getQueuePrefetch()); 399 } 400 } else if (subscription instanceof DurableTopicSubscription) { 401 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH || 402 subscription.getConsumerInfo().getPrefetchSize() == ActiveMQPrefetchPolicy.DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH) { 403 ((DurableTopicSubscription)subscription).setPrefetchSize(getDurableTopicPrefetch()); 404 } 405 } else if (subscription instanceof TopicSubscription) { 406 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH) { 407 ((TopicSubscription) subscription).setPrefetchSize(getTopicPrefetch()); 408 } 409 } 410 if (currentPrefetch != 0 && subscription.getPrefetchSize() == 0) { 411 // tell the sub so that it can issue a pull request 412 subscription.updateConsumerPrefetch(0); 413 } 414 } 415 416 private boolean isUpdate(String property, Set<String> includedProperties) { 417 return includedProperties == null || includedProperties.contains(property); 418 } 419 // Properties 420 // ------------------------------------------------------------------------- 421 public DispatchPolicy getDispatchPolicy() { 422 return dispatchPolicy; 423 } 424 425 public void setDispatchPolicy(DispatchPolicy policy) { 426 this.dispatchPolicy = policy; 427 } 428 429 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 430 return subscriptionRecoveryPolicy; 431 } 432 433 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) { 434 this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; 435 } 436 437 public boolean isSendAdvisoryIfNoConsumers() { 438 return sendAdvisoryIfNoConsumers; 439 } 440 441 /** 442 * Sends an advisory message if a non-persistent message is sent and there 443 * are no active consumers 444 */ 445 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 446 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 447 } 448 449 public DeadLetterStrategy getDeadLetterStrategy() { 450 return deadLetterStrategy; 451 } 452 453 /** 454 * Sets the policy used to determine which dead letter queue destination 455 * should be used 456 */ 457 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 458 this.deadLetterStrategy = deadLetterStrategy; 459 } 460 461 public PendingMessageLimitStrategy getPendingMessageLimitStrategy() { 462 return pendingMessageLimitStrategy; 463 } 464 465 /** 466 * Sets the strategy to calculate the maximum number of messages that are 467 * allowed to be pending on consumers (in addition to their prefetch sizes). 468 * Once the limit is reached, non-durable topics can then start discarding 469 * old messages. This allows us to keep dispatching messages to slow 470 * consumers while not blocking fast consumers and discarding the messages 471 * oldest first. 472 */ 473 public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy) { 474 this.pendingMessageLimitStrategy = pendingMessageLimitStrategy; 475 } 476 477 public MessageEvictionStrategy getMessageEvictionStrategy() { 478 return messageEvictionStrategy; 479 } 480 481 /** 482 * Sets the eviction strategy used to decide which message to evict when the 483 * slow consumer needs to discard messages 484 */ 485 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) { 486 this.messageEvictionStrategy = messageEvictionStrategy; 487 } 488 489 public long getMemoryLimit() { 490 return memoryLimit; 491 } 492 493 /** 494 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 495 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 496 */ 497 public void setMemoryLimit(long memoryLimit) { 498 this.memoryLimit = memoryLimit; 499 } 500 501 public MessageGroupMapFactory getMessageGroupMapFactory() { 502 if (messageGroupMapFactory == null) { 503 try { 504 messageGroupMapFactory = GroupFactoryFinder.createMessageGroupMapFactory(getMessageGroupMapFactoryType()); 505 }catch(Exception e){ 506 LOG.error("Failed to create message group Factory ",e); 507 } 508 } 509 return messageGroupMapFactory; 510 } 511 512 /** 513 * Sets the factory used to create new instances of {MessageGroupMap} used 514 * to implement the <a 515 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 516 * functionality. 517 */ 518 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) { 519 this.messageGroupMapFactory = messageGroupMapFactory; 520 } 521 522 523 public String getMessageGroupMapFactoryType() { 524 return messageGroupMapFactoryType; 525 } 526 527 public void setMessageGroupMapFactoryType(String messageGroupMapFactoryType) { 528 this.messageGroupMapFactoryType = messageGroupMapFactoryType; 529 } 530 531 532 /** 533 * @return the pendingDurableSubscriberPolicy 534 */ 535 public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() { 536 return this.pendingDurableSubscriberPolicy; 537 } 538 539 /** 540 * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy 541 * to set 542 */ 543 public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) { 544 this.pendingDurableSubscriberPolicy = pendingDurableSubscriberPolicy; 545 } 546 547 /** 548 * @return the pendingQueuePolicy 549 */ 550 public PendingQueueMessageStoragePolicy getPendingQueuePolicy() { 551 return this.pendingQueuePolicy; 552 } 553 554 /** 555 * @param pendingQueuePolicy the pendingQueuePolicy to set 556 */ 557 public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy) { 558 this.pendingQueuePolicy = pendingQueuePolicy; 559 } 560 561 /** 562 * @return the pendingSubscriberPolicy 563 */ 564 public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy() { 565 return this.pendingSubscriberPolicy; 566 } 567 568 /** 569 * @param pendingSubscriberPolicy the pendingSubscriberPolicy to set 570 */ 571 public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) { 572 this.pendingSubscriberPolicy = pendingSubscriberPolicy; 573 } 574 575 /** 576 * @return true if producer flow control enabled 577 */ 578 public boolean isProducerFlowControl() { 579 return producerFlowControl; 580 } 581 582 /** 583 * @param producerFlowControl 584 */ 585 public void setProducerFlowControl(boolean producerFlowControl) { 586 this.producerFlowControl = producerFlowControl; 587 } 588 589 /** 590 * @return true if topic is always retroactive 591 */ 592 public boolean isAlwaysRetroactive() { 593 return alwaysRetroactive; 594 } 595 596 /** 597 * @param alwaysRetroactive 598 */ 599 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 600 this.alwaysRetroactive = alwaysRetroactive; 601 } 602 603 604 /** 605 * Set's the interval at which warnings about producers being blocked by 606 * resource usage will be triggered. Values of 0 or less will disable 607 * warnings 608 * 609 * @param blockedProducerWarningInterval the interval at which warning about 610 * blocked producers will be triggered. 611 */ 612 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 613 this.blockedProducerWarningInterval = blockedProducerWarningInterval; 614 } 615 616 /** 617 * 618 * @return the interval at which warning about blocked producers will be 619 * triggered. 620 */ 621 public long getBlockedProducerWarningInterval() { 622 return blockedProducerWarningInterval; 623 } 624 625 /** 626 * @return the maxProducersToAudit 627 */ 628 public int getMaxProducersToAudit() { 629 return maxProducersToAudit; 630 } 631 632 /** 633 * @param maxProducersToAudit the maxProducersToAudit to set 634 */ 635 public void setMaxProducersToAudit(int maxProducersToAudit) { 636 this.maxProducersToAudit = maxProducersToAudit; 637 } 638 639 /** 640 * @return the maxAuditDepth 641 */ 642 public int getMaxAuditDepth() { 643 return maxAuditDepth; 644 } 645 646 /** 647 * @param maxAuditDepth the maxAuditDepth to set 648 */ 649 public void setMaxAuditDepth(int maxAuditDepth) { 650 this.maxAuditDepth = maxAuditDepth; 651 } 652 653 /** 654 * @return the enableAudit 655 */ 656 public boolean isEnableAudit() { 657 return enableAudit; 658 } 659 660 /** 661 * @param enableAudit the enableAudit to set 662 */ 663 public void setEnableAudit(boolean enableAudit) { 664 this.enableAudit = enableAudit; 665 } 666 667 public int getMaxQueueAuditDepth() { 668 return maxQueueAuditDepth; 669 } 670 671 public void setMaxQueueAuditDepth(int maxQueueAuditDepth) { 672 this.maxQueueAuditDepth = maxQueueAuditDepth; 673 } 674 675 public boolean isOptimizedDispatch() { 676 return optimizedDispatch; 677 } 678 679 public void setOptimizedDispatch(boolean optimizedDispatch) { 680 this.optimizedDispatch = optimizedDispatch; 681 } 682 683 public int getMaxPageSize() { 684 return maxPageSize; 685 } 686 687 public void setMaxPageSize(int maxPageSize) { 688 this.maxPageSize = maxPageSize; 689 } 690 691 public int getMaxBrowsePageSize() { 692 return maxBrowsePageSize; 693 } 694 695 public void setMaxBrowsePageSize(int maxPageSize) { 696 this.maxBrowsePageSize = maxPageSize; 697 } 698 699 public boolean isUseCache() { 700 return useCache; 701 } 702 703 public void setUseCache(boolean useCache) { 704 this.useCache = useCache; 705 } 706 707 public long getMinimumMessageSize() { 708 return minimumMessageSize; 709 } 710 711 public void setMinimumMessageSize(long minimumMessageSize) { 712 this.minimumMessageSize = minimumMessageSize; 713 } 714 715 public boolean isUseConsumerPriority() { 716 return useConsumerPriority; 717 } 718 719 public void setUseConsumerPriority(boolean useConsumerPriority) { 720 this.useConsumerPriority = useConsumerPriority; 721 } 722 723 public boolean isStrictOrderDispatch() { 724 return strictOrderDispatch; 725 } 726 727 public void setStrictOrderDispatch(boolean strictOrderDispatch) { 728 this.strictOrderDispatch = strictOrderDispatch; 729 } 730 731 public boolean isLazyDispatch() { 732 return lazyDispatch; 733 } 734 735 public void setLazyDispatch(boolean lazyDispatch) { 736 this.lazyDispatch = lazyDispatch; 737 } 738 739 public int getTimeBeforeDispatchStarts() { 740 return timeBeforeDispatchStarts; 741 } 742 743 public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) { 744 this.timeBeforeDispatchStarts = timeBeforeDispatchStarts; 745 } 746 747 public int getConsumersBeforeDispatchStarts() { 748 return consumersBeforeDispatchStarts; 749 } 750 751 public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { 752 this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; 753 } 754 755 /** 756 * @return the advisoryForSlowConsumers 757 */ 758 public boolean isAdvisoryForSlowConsumers() { 759 return advisoryForSlowConsumers; 760 } 761 762 /** 763 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set 764 */ 765 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { 766 this.advisoryForSlowConsumers = advisoryForSlowConsumers; 767 } 768 769 /** 770 * @return the advisoryForDiscardingMessages 771 */ 772 public boolean isAdvisoryForDiscardingMessages() { 773 return advisoryForDiscardingMessages; 774 } 775 776 /** 777 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set 778 */ 779 public void setAdvisoryForDiscardingMessages( 780 boolean advisoryForDiscardingMessages) { 781 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; 782 } 783 784 /** 785 * @return the advisoryWhenFull 786 */ 787 public boolean isAdvisoryWhenFull() { 788 return advisoryWhenFull; 789 } 790 791 /** 792 * @param advisoryWhenFull the advisoryWhenFull to set 793 */ 794 public void setAdvisoryWhenFull(boolean advisoryWhenFull) { 795 this.advisoryWhenFull = advisoryWhenFull; 796 } 797 798 /** 799 * @return the advisoryForDelivery 800 */ 801 public boolean isAdvisoryForDelivery() { 802 return advisoryForDelivery; 803 } 804 805 /** 806 * @param advisoryForDelivery the advisoryForDelivery to set 807 */ 808 public void setAdvisoryForDelivery(boolean advisoryForDelivery) { 809 this.advisoryForDelivery = advisoryForDelivery; 810 } 811 812 /** 813 * @return the advisoryForConsumed 814 */ 815 public boolean isAdvisoryForConsumed() { 816 return advisoryForConsumed; 817 } 818 819 /** 820 * @param advisoryForConsumed the advisoryForConsumed to set 821 */ 822 public void setAdvisoryForConsumed(boolean advisoryForConsumed) { 823 this.advisoryForConsumed = advisoryForConsumed; 824 } 825 826 /** 827 * @return the advisdoryForFastProducers 828 */ 829 public boolean isAdvisoryForFastProducers() { 830 return advisoryForFastProducers; 831 } 832 833 /** 834 * @param advisoryForFastProducers the advisdoryForFastProducers to set 835 */ 836 public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) { 837 this.advisoryForFastProducers = advisoryForFastProducers; 838 } 839 840 /** 841 * Returns true if the original message body should be included when applicable 842 * for advisory messages 843 * 844 * @return 845 */ 846 public boolean isIncludeBodyForAdvisory() { 847 return includeBodyForAdvisory; 848 } 849 850 /** 851 * Sets if the original message body should be included when applicable 852 * for advisory messages 853 * 854 * @param includeBodyForAdvisory 855 */ 856 public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) { 857 this.includeBodyForAdvisory = includeBodyForAdvisory; 858 } 859 860 public void setMaxExpirePageSize(int maxExpirePageSize) { 861 this.maxExpirePageSize = maxExpirePageSize; 862 } 863 864 public int getMaxExpirePageSize() { 865 return maxExpirePageSize; 866 } 867 868 public void setExpireMessagesPeriod(long expireMessagesPeriod) { 869 this.expireMessagesPeriod = expireMessagesPeriod; 870 } 871 872 public long getExpireMessagesPeriod() { 873 return expireMessagesPeriod; 874 } 875 876 /** 877 * Get the queuePrefetch 878 * @return the queuePrefetch 879 */ 880 public int getQueuePrefetch() { 881 return this.queuePrefetch; 882 } 883 884 /** 885 * Set the queuePrefetch 886 * @param queuePrefetch the queuePrefetch to set 887 */ 888 public void setQueuePrefetch(int queuePrefetch) { 889 this.queuePrefetch = queuePrefetch; 890 } 891 892 /** 893 * Get the queueBrowserPrefetch 894 * @return the queueBrowserPrefetch 895 */ 896 public int getQueueBrowserPrefetch() { 897 return this.queueBrowserPrefetch; 898 } 899 900 /** 901 * Set the queueBrowserPrefetch 902 * @param queueBrowserPrefetch the queueBrowserPrefetch to set 903 */ 904 public void setQueueBrowserPrefetch(int queueBrowserPrefetch) { 905 this.queueBrowserPrefetch = queueBrowserPrefetch; 906 } 907 908 /** 909 * Get the topicPrefetch 910 * @return the topicPrefetch 911 */ 912 public int getTopicPrefetch() { 913 return this.topicPrefetch; 914 } 915 916 /** 917 * Set the topicPrefetch 918 * @param topicPrefetch the topicPrefetch to set 919 */ 920 public void setTopicPrefetch(int topicPrefetch) { 921 this.topicPrefetch = topicPrefetch; 922 } 923 924 /** 925 * Get the durableTopicPrefetch 926 * @return the durableTopicPrefetch 927 */ 928 public int getDurableTopicPrefetch() { 929 return this.durableTopicPrefetch; 930 } 931 932 /** 933 * Set the durableTopicPrefetch 934 * @param durableTopicPrefetch the durableTopicPrefetch to set 935 */ 936 public void setDurableTopicPrefetch(int durableTopicPrefetch) { 937 this.durableTopicPrefetch = durableTopicPrefetch; 938 } 939 940 public boolean isUsePrefetchExtension() { 941 return this.usePrefetchExtension; 942 } 943 944 public void setUsePrefetchExtension(boolean usePrefetchExtension) { 945 this.usePrefetchExtension = usePrefetchExtension; 946 } 947 948 public int getCursorMemoryHighWaterMark() { 949 return this.cursorMemoryHighWaterMark; 950 } 951 952 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 953 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; 954 } 955 956 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { 957 this.storeUsageHighWaterMark = storeUsageHighWaterMark; 958 } 959 960 public int getStoreUsageHighWaterMark() { 961 return storeUsageHighWaterMark; 962 } 963 964 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { 965 this.slowConsumerStrategy = slowConsumerStrategy; 966 } 967 968 public SlowConsumerStrategy getSlowConsumerStrategy() { 969 return this.slowConsumerStrategy; 970 } 971 972 973 public boolean isPrioritizedMessages() { 974 return this.prioritizedMessages; 975 } 976 977 public void setPrioritizedMessages(boolean prioritizedMessages) { 978 this.prioritizedMessages = prioritizedMessages; 979 } 980 981 public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) { 982 this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault; 983 } 984 985 public boolean isAllConsumersExclusiveByDefault() { 986 return allConsumersExclusiveByDefault; 987 } 988 989 public boolean isGcInactiveDestinations() { 990 return this.gcInactiveDestinations; 991 } 992 993 public void setGcInactiveDestinations(boolean gcInactiveDestinations) { 994 this.gcInactiveDestinations = gcInactiveDestinations; 995 } 996 997 /** 998 * @return the amount of time spent inactive before GC of the destination kicks in. 999 * 1000 * @deprecated use getInactiveTimeoutBeforeGC instead. 1001 */ 1002 @Deprecated 1003 public long getInactiveTimoutBeforeGC() { 1004 return getInactiveTimeoutBeforeGC(); 1005 } 1006 1007 /** 1008 * Sets the amount of time a destination is inactive before it is marked for GC 1009 * 1010 * @param inactiveTimoutBeforeGC 1011 * time in milliseconds to configure as the inactive timeout. 1012 * 1013 * @deprecated use getInactiveTimeoutBeforeGC instead. 1014 */ 1015 @Deprecated 1016 public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) { 1017 setInactiveTimeoutBeforeGC(inactiveTimoutBeforeGC); 1018 } 1019 1020 /** 1021 * @return the amount of time spent inactive before GC of the destination kicks in. 1022 */ 1023 public long getInactiveTimeoutBeforeGC() { 1024 return this.inactiveTimeoutBeforeGC; 1025 } 1026 1027 /** 1028 * Sets the amount of time a destination is inactive before it is marked for GC 1029 * 1030 * @param inactiveTimoutBeforeGC 1031 * time in milliseconds to configure as the inactive timeout. 1032 */ 1033 public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) { 1034 this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC; 1035 } 1036 1037 public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { 1038 this.gcWithNetworkConsumers = gcWithNetworkConsumers; 1039 } 1040 1041 public boolean isGcWithNetworkConsumers() { 1042 return gcWithNetworkConsumers; 1043 } 1044 1045 public boolean isReduceMemoryFootprint() { 1046 return reduceMemoryFootprint; 1047 } 1048 1049 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { 1050 this.reduceMemoryFootprint = reduceMemoryFootprint; 1051 } 1052 1053 public void setNetworkBridgeFilterFactory(NetworkBridgeFilterFactory networkBridgeFilterFactory) { 1054 this.networkBridgeFilterFactory = networkBridgeFilterFactory; 1055 } 1056 1057 public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() { 1058 return networkBridgeFilterFactory; 1059 } 1060 1061 public boolean isDoOptimzeMessageStorage() { 1062 return doOptimzeMessageStorage; 1063 } 1064 1065 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { 1066 this.doOptimzeMessageStorage = doOptimzeMessageStorage; 1067 } 1068 1069 public int getOptimizeMessageStoreInFlightLimit() { 1070 return optimizeMessageStoreInFlightLimit; 1071 } 1072 1073 public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) { 1074 this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; 1075 } 1076 1077 public void setPersistJMSRedelivered(boolean val) { 1078 this.persistJMSRedelivered = val; 1079 } 1080 1081 public boolean isPersistJMSRedelivered() { 1082 return persistJMSRedelivered; 1083 } 1084 1085 public int getMaxDestinations() { 1086 return maxDestinations; 1087 } 1088 1089 /** 1090 * Sets the maximum number of destinations that can be created 1091 * 1092 * @param maxDestinations 1093 * maximum number of destinations 1094 */ 1095 public void setMaxDestinations(int maxDestinations) { 1096 this.maxDestinations = maxDestinations; 1097 } 1098 1099 @Override 1100 public String toString() { 1101 return "PolicyEntry [" + destination + "]"; 1102 } 1103}