001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.LinkedList; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.atomic.AtomicLong; 026 027import org.apache.activemq.broker.Broker; 028import org.apache.activemq.broker.ConnectionContext; 029import org.apache.activemq.broker.region.Destination; 030import org.apache.activemq.broker.region.IndirectMessageReference; 031import org.apache.activemq.broker.region.MessageReference; 032import org.apache.activemq.broker.region.QueueMessageReference; 033import org.apache.activemq.command.Message; 034import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 035import org.apache.activemq.openwire.OpenWireFormat; 036import org.apache.activemq.store.PList; 037import org.apache.activemq.store.PListEntry; 038import org.apache.activemq.store.PListStore; 039import org.apache.activemq.usage.SystemUsage; 040import org.apache.activemq.usage.Usage; 041import org.apache.activemq.usage.UsageListener; 042import org.apache.activemq.util.ByteSequence; 043import org.apache.activemq.wireformat.WireFormat; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * persist pending messages pending message (messages awaiting dispatch to a 049 * consumer) cursor 050 */ 051public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { 052 053 static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class); 054 055 private static final AtomicLong NAME_COUNT = new AtomicLong(); 056 057 protected Broker broker; 058 private final PListStore store; 059 private final String name; 060 private PendingList memoryList; 061 private PList diskList; 062 private Iterator<MessageReference> iter; 063 private Destination regionDestination; 064 private boolean iterating; 065 private boolean flushRequired; 066 private final AtomicBoolean started = new AtomicBoolean(); 067 private final WireFormat wireFormat = new OpenWireFormat(); 068 069 /** 070 * @param broker 071 * @param name 072 * @param prioritizedMessages 073 */ 074 public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) { 075 super(prioritizedMessages); 076 if (this.prioritizedMessages) { 077 this.memoryList = new PrioritizedPendingList(); 078 } else { 079 this.memoryList = new OrderedPendingList(); 080 } 081 this.broker = broker; 082 // the store can be null if the BrokerService has persistence 083 // turned off 084 this.store = broker.getTempDataStore(); 085 this.name = NAME_COUNT.incrementAndGet() + "_" + name; 086 } 087 088 @Override 089 public void start() throws Exception { 090 if (started.compareAndSet(false, true)) { 091 if( this.broker != null) { 092 wireFormat.setVersion(this.broker.getBrokerService().getStoreOpenWireVersion()); 093 } 094 super.start(); 095 if (systemUsage != null) { 096 systemUsage.getMemoryUsage().addUsageListener(this); 097 } 098 } 099 } 100 101 @Override 102 public void stop() throws Exception { 103 if (started.compareAndSet(true, false)) { 104 super.stop(); 105 if (systemUsage != null) { 106 systemUsage.getMemoryUsage().removeUsageListener(this); 107 } 108 } 109 } 110 111 /** 112 * @return true if there are no pending messages 113 */ 114 @Override 115 public synchronized boolean isEmpty() { 116 if (memoryList.isEmpty() && isDiskListEmpty()) { 117 return true; 118 } 119 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 120 MessageReference node = iterator.next(); 121 if (node == QueueMessageReference.NULL_MESSAGE) { 122 continue; 123 } 124 if (!node.isDropped()) { 125 return false; 126 } 127 // We can remove dropped references. 128 iterator.remove(); 129 } 130 return isDiskListEmpty(); 131 } 132 133 /** 134 * reset the cursor 135 */ 136 @Override 137 public synchronized void reset() { 138 iterating = true; 139 last = null; 140 if (isDiskListEmpty()) { 141 this.iter = this.memoryList.iterator(); 142 } else { 143 this.iter = new DiskIterator(); 144 } 145 } 146 147 @Override 148 public synchronized void release() { 149 iterating = false; 150 if (iter instanceof DiskIterator) { 151 ((DiskIterator)iter).release(); 152 }; 153 if (flushRequired) { 154 flushRequired = false; 155 if (!hasSpace()) { 156 flushToDisk(); 157 } 158 } 159 // ensure any memory ref is released 160 iter = null; 161 } 162 163 @Override 164 public synchronized void destroy() throws Exception { 165 stop(); 166 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) { 167 MessageReference node = i.next(); 168 node.decrementReferenceCount(); 169 } 170 memoryList.clear(); 171 destroyDiskList(); 172 } 173 174 private void destroyDiskList() throws Exception { 175 if (diskList != null) { 176 store.removePList(name); 177 diskList = null; 178 } 179 } 180 181 @Override 182 public synchronized LinkedList<MessageReference> pageInList(int maxItems) { 183 LinkedList<MessageReference> result = new LinkedList<MessageReference>(); 184 int count = 0; 185 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) { 186 MessageReference ref = i.next(); 187 ref.incrementReferenceCount(); 188 result.add(ref); 189 count++; 190 } 191 if (count < maxItems && !isDiskListEmpty()) { 192 for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) { 193 Message message = (Message) i.next(); 194 message.setRegionDestination(regionDestination); 195 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 196 message.incrementReferenceCount(); 197 result.add(message); 198 count++; 199 } 200 } 201 return result; 202 } 203 204 /** 205 * add message to await dispatch 206 * 207 * @param node 208 * @throws Exception 209 */ 210 @Override 211 public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { 212 if (!node.isExpired()) { 213 try { 214 regionDestination = (Destination) node.getMessage().getRegionDestination(); 215 if (isDiskListEmpty()) { 216 if (hasSpace() || this.store == null) { 217 memoryList.addMessageLast(node); 218 node.incrementReferenceCount(); 219 setCacheEnabled(true); 220 return true; 221 } 222 } 223 if (!hasSpace()) { 224 if (isDiskListEmpty()) { 225 expireOldMessages(); 226 if (hasSpace()) { 227 memoryList.addMessageLast(node); 228 node.incrementReferenceCount(); 229 return true; 230 } else { 231 flushToDisk(); 232 } 233 } 234 } 235 if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) { 236 ByteSequence bs = getByteSequence(node.getMessage()); 237 getDiskList().addLast(node.getMessageId().toString(), bs); 238 return true; 239 } 240 return false; 241 242 } catch (Exception e) { 243 LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e); 244 throw new RuntimeException(e); 245 } 246 } else { 247 discardExpiredMessage(node); 248 } 249 //message expired 250 return true; 251 } 252 253 /** 254 * add message to await dispatch 255 * 256 * @param node 257 */ 258 @Override 259 public synchronized void addMessageFirst(MessageReference node) { 260 if (!node.isExpired()) { 261 try { 262 regionDestination = (Destination) node.getMessage().getRegionDestination(); 263 if (isDiskListEmpty()) { 264 if (hasSpace()) { 265 memoryList.addMessageFirst(node); 266 node.incrementReferenceCount(); 267 setCacheEnabled(true); 268 return; 269 } 270 } 271 if (!hasSpace()) { 272 if (isDiskListEmpty()) { 273 expireOldMessages(); 274 if (hasSpace()) { 275 memoryList.addMessageFirst(node); 276 node.incrementReferenceCount(); 277 return; 278 } else { 279 flushToDisk(); 280 } 281 } 282 } 283 systemUsage.getTempUsage().waitForSpace(); 284 node.decrementReferenceCount(); 285 ByteSequence bs = getByteSequence(node.getMessage()); 286 Object locator = getDiskList().addFirst(node.getMessageId().toString(), bs); 287 node.getMessageId().setPlistLocator(locator); 288 289 } catch (Exception e) { 290 LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e); 291 throw new RuntimeException(e); 292 } 293 } else { 294 discardExpiredMessage(node); 295 } 296 } 297 298 /** 299 * @return true if there pending messages to dispatch 300 */ 301 @Override 302 public synchronized boolean hasNext() { 303 return iter.hasNext(); 304 } 305 306 /** 307 * @return the next pending message 308 */ 309 @Override 310 public synchronized MessageReference next() { 311 MessageReference reference = iter.next(); 312 last = reference; 313 if (!isDiskListEmpty()) { 314 // got from disk 315 reference.getMessage().setRegionDestination(regionDestination); 316 reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 317 } 318 reference.incrementReferenceCount(); 319 return reference; 320 } 321 322 /** 323 * remove the message at the cursor position 324 */ 325 @Override 326 public synchronized void remove() { 327 iter.remove(); 328 if (last != null) { 329 last.decrementReferenceCount(); 330 } 331 } 332 333 /** 334 * @param node 335 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) 336 */ 337 @Override 338 public synchronized void remove(MessageReference node) { 339 if (memoryList.remove(node) != null) { 340 node.decrementReferenceCount(); 341 } 342 if (!isDiskListEmpty()) { 343 try { 344 getDiskList().remove(node.getMessageId().getPlistLocator()); 345 } catch (IOException e) { 346 throw new RuntimeException(e); 347 } 348 } 349 } 350 351 /** 352 * @return the number of pending messages 353 */ 354 @Override 355 public synchronized int size() { 356 return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size()); 357 } 358 359 @Override 360 public synchronized long messageSize() { 361 return memoryList.messageSize() + (isDiskListEmpty() ? 0 : getDiskList().messageSize()); 362 } 363 364 /** 365 * clear all pending messages 366 */ 367 @Override 368 public synchronized void clear() { 369 memoryList.clear(); 370 if (!isDiskListEmpty()) { 371 try { 372 getDiskList().destroy(); 373 } catch (IOException e) { 374 throw new RuntimeException(e); 375 } 376 } 377 last = null; 378 } 379 380 @Override 381 public synchronized boolean isFull() { 382 return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull()); 383 } 384 385 @Override 386 public boolean hasMessagesBufferedToDeliver() { 387 return !isEmpty(); 388 } 389 390 @Override 391 public void setSystemUsage(SystemUsage usageManager) { 392 super.setSystemUsage(usageManager); 393 } 394 395 @Override 396 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 397 if (newPercentUsage >= getMemoryUsageHighWaterMark()) { 398 List<MessageReference> expiredMessages = null; 399 synchronized (this) { 400 if (!flushRequired && size() != 0) { 401 flushRequired =true; 402 if (!iterating) { 403 expiredMessages = expireOldMessages(); 404 if (!hasSpace()) { 405 flushToDisk(); 406 flushRequired = false; 407 } 408 } 409 } 410 } 411 412 if (expiredMessages != null) { 413 for (MessageReference node : expiredMessages) { 414 discardExpiredMessage(node); 415 } 416 } 417 } 418 } 419 420 @Override 421 public boolean isTransient() { 422 return true; 423 } 424 425 private synchronized List<MessageReference> expireOldMessages() { 426 List<MessageReference> expired = new ArrayList<MessageReference>(); 427 if (!memoryList.isEmpty()) { 428 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 429 MessageReference node = iterator.next(); 430 if (node.isExpired()) { 431 node.decrementReferenceCount(); 432 expired.add(node); 433 iterator.remove(); 434 } 435 } 436 } 437 438 return expired; 439 } 440 441 protected synchronized void flushToDisk() { 442 if (!memoryList.isEmpty() && store != null) { 443 long start = 0; 444 if (LOG.isTraceEnabled()) { 445 start = System.currentTimeMillis(); 446 LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[] { name, memoryList.size(), 447 (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); 448 } 449 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 450 MessageReference node = iterator.next(); 451 node.decrementReferenceCount(); 452 ByteSequence bs; 453 try { 454 bs = getByteSequence(node.getMessage()); 455 getDiskList().addLast(node.getMessageId().toString(), bs); 456 } catch (IOException e) { 457 LOG.error("Failed to write to disk list", e); 458 throw new RuntimeException(e); 459 } 460 461 } 462 memoryList.clear(); 463 setCacheEnabled(false); 464 LOG.trace("{}, flushToDisk() done - {} ms {}", new Object[]{ name, (System.currentTimeMillis() - start), (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); 465 } 466 } 467 468 protected boolean isDiskListEmpty() { 469 return diskList == null || diskList.isEmpty(); 470 } 471 472 public PList getDiskList() { 473 if (diskList == null) { 474 try { 475 diskList = store.getPList(name); 476 } catch (Exception e) { 477 LOG.error("Caught an IO Exception getting the DiskList {}", name, e); 478 throw new RuntimeException(e); 479 } 480 } 481 return diskList; 482 } 483 484 private void discardExpiredMessage(MessageReference reference) { 485 LOG.debug("Discarding expired message {}", reference); 486 if (reference.isExpired() && broker.isExpired(reference)) { 487 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); 488 context.setBroker(broker); 489 ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage())); 490 } 491 } 492 493 protected ByteSequence getByteSequence(Message message) throws IOException { 494 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 495 return new ByteSequence(packet.data, packet.offset, packet.length); 496 } 497 498 protected Message getMessage(ByteSequence bs) throws IOException { 499 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs 500 .getOffset(), bs.getLength()); 501 return (Message) this.wireFormat.unmarshal(packet); 502 503 } 504 505 final class DiskIterator implements Iterator<MessageReference> { 506 private final PList.PListIterator iterator; 507 DiskIterator() { 508 try { 509 iterator = getDiskList().iterator(); 510 } catch (Exception e) { 511 throw new RuntimeException(e); 512 } 513 } 514 515 @Override 516 public boolean hasNext() { 517 return iterator.hasNext(); 518 } 519 520 @Override 521 public MessageReference next() { 522 try { 523 PListEntry entry = iterator.next(); 524 Message message = getMessage(entry.getByteSequence()); 525 message.getMessageId().setPlistLocator(entry.getLocator()); 526 return message; 527 } catch (IOException e) { 528 LOG.error("I/O error", e); 529 throw new RuntimeException(e); 530 } 531 } 532 533 @Override 534 public void remove() { 535 iterator.remove(); 536 } 537 538 public void release() { 539 iterator.release(); 540 } 541 } 542}