001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import org.apache.activemq.broker.Broker; 020import org.apache.activemq.broker.region.MessageReference; 021import org.apache.activemq.broker.region.Queue; 022import org.apache.activemq.command.Message; 023import org.apache.activemq.usage.SystemUsage; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027/** 028 * Store based Cursor for Queues 029 */ 030public class StoreQueueCursor extends AbstractPendingMessageCursor { 031 032 private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class); 033 private final Broker broker; 034 private int pendingCount; 035 private final Queue queue; 036 private PendingMessageCursor nonPersistent; 037 private final QueueStorePrefetch persistent; 038 private boolean started; 039 private PendingMessageCursor currentCursor; 040 041 /** 042 * Construct 043 * @param broker 044 * @param queue 045 */ 046 public StoreQueueCursor(Broker broker,Queue queue) { 047 super((queue != null ? queue.isPrioritizedMessages():false)); 048 this.broker=broker; 049 this.queue = queue; 050 this.persistent = new QueueStorePrefetch(queue, broker); 051 currentCursor = persistent; 052 } 053 054 @Override 055 public synchronized void start() throws Exception { 056 started = true; 057 super.start(); 058 if (nonPersistent == null) { 059 if (broker.getBrokerService().isPersistent()) { 060 nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages); 061 }else { 062 nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); 063 } 064 nonPersistent.setMaxBatchSize(getMaxBatchSize()); 065 nonPersistent.setSystemUsage(systemUsage); 066 nonPersistent.setEnableAudit(isEnableAudit()); 067 nonPersistent.setMaxAuditDepth(getMaxAuditDepth()); 068 nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit()); 069 } 070 nonPersistent.setMessageAudit(getMessageAudit()); 071 nonPersistent.start(); 072 persistent.setMessageAudit(getMessageAudit()); 073 persistent.start(); 074 pendingCount = persistent.size() + nonPersistent.size(); 075 } 076 077 @Override 078 public synchronized void stop() throws Exception { 079 started = false; 080 if (nonPersistent != null) { 081// nonPersistent.clear(); 082// nonPersistent.stop(); 083// nonPersistent.gc(); 084 nonPersistent.destroy(); 085 } 086 persistent.stop(); 087 persistent.gc(); 088 super.stop(); 089 pendingCount = 0; 090 } 091 092 @Override 093 public synchronized boolean tryAddMessageLast(MessageReference node, long maxWait) throws Exception { 094 boolean result = true; 095 if (node != null) { 096 Message msg = node.getMessage(); 097 if (started) { 098 pendingCount++; 099 if (!msg.isPersistent()) { 100 result = nonPersistent.tryAddMessageLast(node, maxWait); 101 } 102 } 103 if (msg.isPersistent()) { 104 result = persistent.addMessageLast(node); 105 } 106 } 107 return result; 108 } 109 110 @Override 111 public synchronized void addMessageFirst(MessageReference node) throws Exception { 112 if (node != null) { 113 Message msg = node.getMessage(); 114 if (started) { 115 pendingCount++; 116 if (!msg.isPersistent()) { 117 nonPersistent.addMessageFirst(node); 118 } 119 } 120 if (msg.isPersistent()) { 121 persistent.addMessageFirst(node); 122 } 123 } 124 } 125 126 @Override 127 public synchronized void clear() { 128 pendingCount = 0; 129 } 130 131 @Override 132 public synchronized boolean hasNext() { 133 try { 134 getNextCursor(); 135 } catch (Exception e) { 136 LOG.error("Failed to get current cursor ", e); 137 throw new RuntimeException(e); 138 } 139 return currentCursor != null ? currentCursor.hasNext() : false; 140 } 141 142 @Override 143 public synchronized MessageReference next() { 144 MessageReference result = currentCursor != null ? currentCursor.next() : null; 145 return result; 146 } 147 148 @Override 149 public synchronized void remove() { 150 if (currentCursor != null) { 151 currentCursor.remove(); 152 } 153 pendingCount--; 154 } 155 156 @Override 157 public synchronized void remove(MessageReference node) { 158 if (!node.isPersistent()) { 159 nonPersistent.remove(node); 160 } else { 161 persistent.remove(node); 162 } 163 pendingCount--; 164 } 165 166 @Override 167 public synchronized void reset() { 168 nonPersistent.reset(); 169 persistent.reset(); 170 pendingCount = persistent.size() + nonPersistent.size(); 171 } 172 173 @Override 174 public void release() { 175 nonPersistent.release(); 176 persistent.release(); 177 } 178 179 180 @Override 181 public synchronized int size() { 182 if (pendingCount < 0) { 183 pendingCount = persistent.size() + nonPersistent.size(); 184 } 185 return pendingCount; 186 } 187 188 @Override 189 public synchronized long messageSize() { 190 return persistent.messageSize() + nonPersistent.messageSize(); 191 } 192 193 @Override 194 public synchronized boolean isEmpty() { 195 // if negative, more messages arrived in store since last reset so non empty 196 return pendingCount == 0; 197 } 198 199 /** 200 * Informs the Broker if the subscription needs to intervention to recover 201 * it's state e.g. DurableTopicSubscriber may do 202 * 203 * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor 204 * @return true if recovery required 205 */ 206 @Override 207 public boolean isRecoveryRequired() { 208 return false; 209 } 210 211 /** 212 * @return the nonPersistent Cursor 213 */ 214 public PendingMessageCursor getNonPersistent() { 215 return this.nonPersistent; 216 } 217 218 /** 219 * @param nonPersistent cursor to set 220 */ 221 public void setNonPersistent(PendingMessageCursor nonPersistent) { 222 this.nonPersistent = nonPersistent; 223 } 224 225 @Override 226 public void setMaxBatchSize(int maxBatchSize) { 227 persistent.setMaxBatchSize(maxBatchSize); 228 if (nonPersistent != null) { 229 nonPersistent.setMaxBatchSize(maxBatchSize); 230 } 231 super.setMaxBatchSize(maxBatchSize); 232 } 233 234 235 @Override 236 public void setMaxProducersToAudit(int maxProducersToAudit) { 237 super.setMaxProducersToAudit(maxProducersToAudit); 238 if (persistent != null) { 239 persistent.setMaxProducersToAudit(maxProducersToAudit); 240 } 241 if (nonPersistent != null) { 242 nonPersistent.setMaxProducersToAudit(maxProducersToAudit); 243 } 244 } 245 246 @Override 247 public void setMaxAuditDepth(int maxAuditDepth) { 248 super.setMaxAuditDepth(maxAuditDepth); 249 if (persistent != null) { 250 persistent.setMaxAuditDepth(maxAuditDepth); 251 } 252 if (nonPersistent != null) { 253 nonPersistent.setMaxAuditDepth(maxAuditDepth); 254 } 255 } 256 257 @Override 258 public void setEnableAudit(boolean enableAudit) { 259 super.setEnableAudit(enableAudit); 260 if (persistent != null) { 261 persistent.setEnableAudit(enableAudit); 262 } 263 if (nonPersistent != null) { 264 nonPersistent.setEnableAudit(enableAudit); 265 } 266 } 267 268 @Override 269 public void setUseCache(boolean useCache) { 270 super.setUseCache(useCache); 271 if (persistent != null) { 272 persistent.setUseCache(useCache); 273 } 274 if (nonPersistent != null) { 275 nonPersistent.setUseCache(useCache); 276 } 277 } 278 279 @Override 280 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 281 super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 282 if (persistent != null) { 283 persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 284 } 285 if (nonPersistent != null) { 286 nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 287 } 288 } 289 290 291 292 @Override 293 public synchronized void gc() { 294 if (persistent != null) { 295 persistent.gc(); 296 } 297 if (nonPersistent != null) { 298 nonPersistent.gc(); 299 } 300 pendingCount = persistent.size() + nonPersistent.size(); 301 } 302 303 @Override 304 public void setSystemUsage(SystemUsage usageManager) { 305 super.setSystemUsage(usageManager); 306 if (persistent != null) { 307 persistent.setSystemUsage(usageManager); 308 } 309 if (nonPersistent != null) { 310 nonPersistent.setSystemUsage(usageManager); 311 } 312 } 313 314 protected synchronized PendingMessageCursor getNextCursor() throws Exception { 315 if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) { 316 currentCursor = currentCursor == persistent ? nonPersistent : persistent; 317 // sanity check 318 if (currentCursor.isEmpty()) { 319 currentCursor = currentCursor == persistent ? nonPersistent : persistent; 320 } 321 } 322 return currentCursor; 323 } 324 325 @Override 326 public boolean isCacheEnabled() { 327 boolean cacheEnabled = isUseCache(); 328 if (cacheEnabled) { 329 if (persistent != null) { 330 cacheEnabled &= persistent.isCacheEnabled(); 331 } 332 if (nonPersistent != null) { 333 cacheEnabled &= nonPersistent.isCacheEnabled(); 334 } 335 setCacheEnabled(cacheEnabled); 336 } 337 return cacheEnabled; 338 } 339 340 @Override 341 public void rebase() { 342 persistent.rebase(); 343 reset(); 344 } 345 346}