001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.DataInput; 024import java.io.DataOutput; 025import java.io.EOFException; 026import java.io.File; 027import java.io.IOException; 028import java.io.InputStream; 029import java.io.InterruptedIOException; 030import java.io.ObjectInputStream; 031import java.io.ObjectOutputStream; 032import java.io.OutputStream; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.Collections; 037import java.util.Date; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.LinkedHashMap; 042import java.util.LinkedHashSet; 043import java.util.LinkedList; 044import java.util.List; 045import java.util.Map; 046import java.util.Map.Entry; 047import java.util.Set; 048import java.util.SortedSet; 049import java.util.TreeMap; 050import java.util.TreeSet; 051import java.util.concurrent.ConcurrentHashMap; 052import java.util.concurrent.ConcurrentMap; 053import java.util.concurrent.Executors; 054import java.util.concurrent.ScheduledExecutorService; 055import java.util.concurrent.ThreadFactory; 056import java.util.concurrent.TimeUnit; 057import java.util.concurrent.atomic.AtomicBoolean; 058import java.util.concurrent.atomic.AtomicLong; 059import java.util.concurrent.atomic.AtomicReference; 060import java.util.concurrent.locks.ReentrantReadWriteLock; 061 062import org.apache.activemq.ActiveMQMessageAuditNoSync; 063import org.apache.activemq.broker.BrokerService; 064import org.apache.activemq.broker.BrokerServiceAware; 065import org.apache.activemq.broker.region.Destination; 066import org.apache.activemq.broker.region.Queue; 067import org.apache.activemq.broker.region.Topic; 068import org.apache.activemq.command.MessageAck; 069import org.apache.activemq.command.TransactionId; 070import org.apache.activemq.openwire.OpenWireFormat; 071import org.apache.activemq.protobuf.Buffer; 072import org.apache.activemq.store.MessageStore; 073import org.apache.activemq.store.MessageStoreStatistics; 074import org.apache.activemq.store.MessageStoreSubscriptionStatistics; 075import org.apache.activemq.store.TopicMessageStore; 076import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; 077import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 078import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 079import org.apache.activemq.store.kahadb.data.KahaDestination; 080import org.apache.activemq.store.kahadb.data.KahaEntryType; 081import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 082import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; 083import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 084import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 085import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand; 086import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 087import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 088import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 089import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 090import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 091import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 092import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; 093import org.apache.activemq.store.kahadb.disk.index.ListIndex; 094import org.apache.activemq.store.kahadb.disk.journal.DataFile; 095import org.apache.activemq.store.kahadb.disk.journal.Journal; 096import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; 097import org.apache.activemq.store.kahadb.disk.journal.Location; 098import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender; 099import org.apache.activemq.store.kahadb.disk.page.Page; 100import org.apache.activemq.store.kahadb.disk.page.PageFile; 101import org.apache.activemq.store.kahadb.disk.page.Transaction; 102import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; 103import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; 104import org.apache.activemq.store.kahadb.disk.util.Marshaller; 105import org.apache.activemq.store.kahadb.disk.util.Sequence; 106import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 107import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 108import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 109import org.apache.activemq.util.ByteSequence; 110import org.apache.activemq.util.DataByteArrayInputStream; 111import org.apache.activemq.util.DataByteArrayOutputStream; 112import org.apache.activemq.util.IOExceptionSupport; 113import org.apache.activemq.util.IOHelper; 114import org.apache.activemq.util.ServiceStopper; 115import org.apache.activemq.util.ServiceSupport; 116import org.apache.activemq.util.ThreadPoolUtils; 117import org.slf4j.Logger; 118import org.slf4j.LoggerFactory; 119import org.slf4j.MDC; 120 121public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { 122 123 protected BrokerService brokerService; 124 125 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 126 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); 127 public static final File DEFAULT_DIRECTORY = new File("KahaDB"); 128 protected static final Buffer UNMATCHED; 129 static { 130 UNMATCHED = new Buffer(new byte[]{}); 131 } 132 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); 133 134 static final int CLOSED_STATE = 1; 135 static final int OPEN_STATE = 2; 136 static final long NOT_ACKED = -1; 137 138 static final int VERSION = 6; 139 140 static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1; 141 142 protected class Metadata { 143 protected Page<Metadata> page; 144 protected int state; 145 protected BTreeIndex<String, StoredDestination> destinations; 146 protected Location lastUpdate; 147 protected Location firstInProgressTransactionLocation; 148 protected Location producerSequenceIdTrackerLocation = null; 149 protected Location ackMessageFileMapLocation = null; 150 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); 151 protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<>(); 152 protected int version = VERSION; 153 protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; 154 155 public void read(DataInput is) throws IOException { 156 state = is.readInt(); 157 destinations = new BTreeIndex<>(pageFile, is.readLong()); 158 if (is.readBoolean()) { 159 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); 160 } else { 161 lastUpdate = null; 162 } 163 if (is.readBoolean()) { 164 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); 165 } else { 166 firstInProgressTransactionLocation = null; 167 } 168 try { 169 if (is.readBoolean()) { 170 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); 171 } else { 172 producerSequenceIdTrackerLocation = null; 173 } 174 } catch (EOFException expectedOnUpgrade) { 175 } 176 try { 177 version = is.readInt(); 178 } catch (EOFException expectedOnUpgrade) { 179 version = 1; 180 } 181 if (version >= 5 && is.readBoolean()) { 182 ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is); 183 } else { 184 ackMessageFileMapLocation = null; 185 } 186 try { 187 openwireVersion = is.readInt(); 188 } catch (EOFException expectedOnUpgrade) { 189 openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION; 190 } 191 LOG.info("KahaDB is version " + version); 192 } 193 194 public void write(DataOutput os) throws IOException { 195 os.writeInt(state); 196 os.writeLong(destinations.getPageId()); 197 198 if (lastUpdate != null) { 199 os.writeBoolean(true); 200 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); 201 } else { 202 os.writeBoolean(false); 203 } 204 205 if (firstInProgressTransactionLocation != null) { 206 os.writeBoolean(true); 207 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); 208 } else { 209 os.writeBoolean(false); 210 } 211 212 if (producerSequenceIdTrackerLocation != null) { 213 os.writeBoolean(true); 214 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); 215 } else { 216 os.writeBoolean(false); 217 } 218 os.writeInt(VERSION); 219 if (ackMessageFileMapLocation != null) { 220 os.writeBoolean(true); 221 LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os); 222 } else { 223 os.writeBoolean(false); 224 } 225 os.writeInt(this.openwireVersion); 226 } 227 } 228 229 class MetadataMarshaller extends VariableMarshaller<Metadata> { 230 @Override 231 public Metadata readPayload(DataInput dataIn) throws IOException { 232 Metadata rc = createMetadata(); 233 rc.read(dataIn); 234 return rc; 235 } 236 237 @Override 238 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 239 object.write(dataOut); 240 } 241 } 242 243 protected PageFile pageFile; 244 protected Journal journal; 245 protected Metadata metadata = new Metadata(); 246 247 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); 248 249 protected boolean failIfDatabaseIsLocked; 250 251 protected boolean deleteAllMessages; 252 protected File directory = DEFAULT_DIRECTORY; 253 protected File indexDirectory = null; 254 protected ScheduledExecutorService scheduler; 255 private final Object schedulerLock = new Object(); 256 257 protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 258 protected boolean archiveDataLogs; 259 protected File directoryArchive; 260 protected AtomicLong journalSize = new AtomicLong(0); 261 long journalDiskSyncInterval = 1000; 262 long checkpointInterval = 5*1000; 263 long cleanupInterval = 30*1000; 264 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 265 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 266 boolean enableIndexWriteAsync = false; 267 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 268 private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name(); 269 private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name(); 270 271 protected AtomicBoolean opened = new AtomicBoolean(); 272 private boolean ignoreMissingJournalfiles = false; 273 private int indexCacheSize = 10000; 274 private boolean checkForCorruptJournalFiles = false; 275 private boolean checksumJournalFiles = true; 276 protected boolean forceRecoverIndex = false; 277 private boolean archiveCorruptedIndex = false; 278 private boolean useIndexLFRUEviction = false; 279 private float indexLFUEvictionFactor = 0.2f; 280 private boolean enableIndexDiskSyncs = true; 281 private boolean enableIndexRecoveryFile = true; 282 private boolean enableIndexPageCaching = true; 283 ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); 284 285 private boolean enableAckCompaction = true; 286 private int compactAcksAfterNoGC = 10; 287 private boolean compactAcksIgnoresStoreGrowth = false; 288 private int checkPointCyclesWithNoGC; 289 private int journalLogOnLastCompactionCheck; 290 private boolean enableSubscriptionStatistics = false; 291 292 //only set when using JournalDiskSyncStrategy.PERIODIC 293 protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>(); 294 295 @Override 296 public void doStart() throws Exception { 297 load(); 298 } 299 300 @Override 301 public void doStop(ServiceStopper stopper) throws Exception { 302 unload(); 303 } 304 305 public void allowIOResumption() { 306 if (pageFile != null) { 307 pageFile.allowIOResumption(); 308 } 309 if (journal != null) { 310 journal.allowIOResumption(); 311 } 312 } 313 314 private void loadPageFile() throws IOException { 315 this.indexLock.writeLock().lock(); 316 try { 317 final PageFile pageFile = getPageFile(); 318 pageFile.load(); 319 pageFile.tx().execute(new Transaction.Closure<IOException>() { 320 @Override 321 public void execute(Transaction tx) throws IOException { 322 if (pageFile.getPageCount() == 0) { 323 // First time this is created.. Initialize the metadata 324 Page<Metadata> page = tx.allocate(); 325 assert page.getPageId() == 0; 326 page.set(metadata); 327 metadata.page = page; 328 metadata.state = CLOSED_STATE; 329 metadata.destinations = new BTreeIndex<>(pageFile, tx.allocate().getPageId()); 330 331 tx.store(metadata.page, metadataMarshaller, true); 332 } else { 333 Page<Metadata> page = tx.load(0, metadataMarshaller); 334 metadata = page.get(); 335 metadata.page = page; 336 } 337 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 338 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); 339 metadata.destinations.load(tx); 340 } 341 }); 342 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. 343 // Perhaps we should just keep an index of file 344 storedDestinations.clear(); 345 pageFile.tx().execute(new Transaction.Closure<IOException>() { 346 @Override 347 public void execute(Transaction tx) throws IOException { 348 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 349 Entry<String, StoredDestination> entry = iterator.next(); 350 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); 351 storedDestinations.put(entry.getKey(), sd); 352 353 if (checkForCorruptJournalFiles) { 354 // sanity check the index also 355 if (!entry.getValue().locationIndex.isEmpty(tx)) { 356 if (entry.getValue().orderIndex.nextMessageId <= 0) { 357 throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); 358 } 359 } 360 } 361 } 362 } 363 }); 364 pageFile.flush(); 365 } finally { 366 this.indexLock.writeLock().unlock(); 367 } 368 } 369 370 private void startCheckpoint() { 371 if (checkpointInterval == 0 && cleanupInterval == 0) { 372 LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); 373 return; 374 } 375 synchronized (schedulerLock) { 376 if (scheduler == null || scheduler.isShutdown()) { 377 scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { 378 379 @Override 380 public Thread newThread(Runnable r) { 381 Thread schedulerThread = new Thread(r); 382 383 schedulerThread.setName("ActiveMQ Journal Checkpoint Worker"); 384 schedulerThread.setDaemon(true); 385 386 return schedulerThread; 387 } 388 }); 389 390 // Short intervals for check-point and cleanups 391 long delay; 392 if (journal.isJournalDiskSyncPeriodic()) { 393 delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500); 394 } else { 395 delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); 396 } 397 398 scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS); 399 } 400 } 401 } 402 403 private final class CheckpointRunner implements Runnable { 404 405 private long lastCheckpoint = System.currentTimeMillis(); 406 private long lastCleanup = System.currentTimeMillis(); 407 private long lastSync = System.currentTimeMillis(); 408 private Location lastAsyncUpdate = null; 409 410 @Override 411 public void run() { 412 try { 413 // Decide on cleanup vs full checkpoint here. 414 if (opened.get()) { 415 long now = System.currentTimeMillis(); 416 if (journal.isJournalDiskSyncPeriodic() && 417 journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) { 418 Location currentUpdate = lastAsyncJournalUpdate.get(); 419 if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) { 420 lastAsyncUpdate = currentUpdate; 421 if (LOG.isTraceEnabled()) { 422 LOG.trace("Writing trace command to trigger journal sync"); 423 } 424 store(new KahaTraceCommand(), true, null, null); 425 } 426 lastSync = now; 427 } 428 if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) { 429 checkpointCleanup(true); 430 lastCleanup = now; 431 lastCheckpoint = now; 432 } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) { 433 checkpointCleanup(false); 434 lastCheckpoint = now; 435 } 436 } 437 } catch (IOException ioe) { 438 LOG.error("Checkpoint failed", ioe); 439 brokerService.handleIOException(ioe); 440 } catch (Throwable e) { 441 LOG.error("Checkpoint failed", e); 442 brokerService.handleIOException(IOExceptionSupport.create(e)); 443 } 444 } 445 } 446 447 public void open() throws IOException { 448 if( opened.compareAndSet(false, true) ) { 449 getJournal().start(); 450 try { 451 loadPageFile(); 452 } catch (Throwable t) { 453 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); 454 if (LOG.isDebugEnabled()) { 455 LOG.debug("Index load failure", t); 456 } 457 // try to recover index 458 try { 459 pageFile.unload(); 460 } catch (Exception ignore) {} 461 if (archiveCorruptedIndex) { 462 pageFile.archive(); 463 } else { 464 pageFile.delete(); 465 } 466 metadata = createMetadata(); 467 //The metadata was recreated after a detect corruption so we need to 468 //reconfigure anything that was configured on the old metadata on startup 469 configureMetadata(); 470 pageFile = null; 471 loadPageFile(); 472 } 473 recover(); 474 startCheckpoint(); 475 } 476 } 477 478 public void load() throws IOException { 479 this.indexLock.writeLock().lock(); 480 try { 481 IOHelper.mkdirs(directory); 482 if (deleteAllMessages) { 483 getJournal().setCheckForCorruptionOnStartup(false); 484 getJournal().start(); 485 getJournal().delete(); 486 getJournal().close(); 487 journal = null; 488 getPageFile().delete(); 489 LOG.info("Persistence store purged."); 490 deleteAllMessages = false; 491 } 492 493 open(); 494 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 495 } finally { 496 this.indexLock.writeLock().unlock(); 497 } 498 } 499 500 public void close() throws IOException, InterruptedException { 501 if (opened.compareAndSet(true, false)) { 502 checkpointLock.writeLock().lock(); 503 try { 504 if (metadata.page != null) { 505 checkpointUpdate(true); 506 } 507 pageFile.unload(); 508 metadata = createMetadata(); 509 } finally { 510 checkpointLock.writeLock().unlock(); 511 } 512 journal.close(); 513 synchronized(schedulerLock) { 514 if (scheduler != null) { 515 ThreadPoolUtils.shutdownGraceful(scheduler, -1); 516 scheduler = null; 517 } 518 } 519 // clear the cache and journalSize on shutdown of the store 520 storeCache.clear(); 521 journalSize.set(0); 522 } 523 } 524 525 public void unload() throws IOException, InterruptedException { 526 this.indexLock.writeLock().lock(); 527 try { 528 if( pageFile != null && pageFile.isLoaded() ) { 529 metadata.state = CLOSED_STATE; 530 metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; 531 532 if (metadata.page != null) { 533 pageFile.tx().execute(new Transaction.Closure<IOException>() { 534 @Override 535 public void execute(Transaction tx) throws IOException { 536 tx.store(metadata.page, metadataMarshaller, true); 537 } 538 }); 539 } 540 } 541 } finally { 542 this.indexLock.writeLock().unlock(); 543 } 544 close(); 545 } 546 547 // public for testing 548 @SuppressWarnings("rawtypes") 549 public Location[] getInProgressTxLocationRange() { 550 Location[] range = new Location[]{null, null}; 551 synchronized (inflightTransactions) { 552 if (!inflightTransactions.isEmpty()) { 553 for (List<Operation> ops : inflightTransactions.values()) { 554 if (!ops.isEmpty()) { 555 trackMaxAndMin(range, ops); 556 } 557 } 558 } 559 if (!preparedTransactions.isEmpty()) { 560 for (List<Operation> ops : preparedTransactions.values()) { 561 if (!ops.isEmpty()) { 562 trackMaxAndMin(range, ops); 563 } 564 } 565 } 566 } 567 return range; 568 } 569 570 @SuppressWarnings("rawtypes") 571 private void trackMaxAndMin(Location[] range, List<Operation> ops) { 572 Location t = ops.get(0).getLocation(); 573 if (range[0] == null || t.compareTo(range[0]) <= 0) { 574 range[0] = t; 575 } 576 t = ops.get(ops.size() -1).getLocation(); 577 if (range[1] == null || t.compareTo(range[1]) >= 0) { 578 range[1] = t; 579 } 580 } 581 582 class TranInfo { 583 TransactionId id; 584 Location location; 585 586 class opCount { 587 int add; 588 int remove; 589 } 590 HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<>(); 591 592 @SuppressWarnings("rawtypes") 593 public void track(Operation operation) { 594 if (location == null ) { 595 location = operation.getLocation(); 596 } 597 KahaDestination destination; 598 boolean isAdd = false; 599 if (operation instanceof AddOperation) { 600 AddOperation add = (AddOperation) operation; 601 destination = add.getCommand().getDestination(); 602 isAdd = true; 603 } else { 604 RemoveOperation removeOpperation = (RemoveOperation) operation; 605 destination = removeOpperation.getCommand().getDestination(); 606 } 607 opCount opCount = destinationOpCount.get(destination); 608 if (opCount == null) { 609 opCount = new opCount(); 610 destinationOpCount.put(destination, opCount); 611 } 612 if (isAdd) { 613 opCount.add++; 614 } else { 615 opCount.remove++; 616 } 617 } 618 619 @Override 620 public String toString() { 621 StringBuffer buffer = new StringBuffer(); 622 buffer.append(location).append(";").append(id).append(";\n"); 623 for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) { 624 buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); 625 } 626 return buffer.toString(); 627 } 628 } 629 630 @SuppressWarnings("rawtypes") 631 public String getTransactions() { 632 633 ArrayList<TranInfo> infos = new ArrayList<>(); 634 synchronized (inflightTransactions) { 635 if (!inflightTransactions.isEmpty()) { 636 for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) { 637 TranInfo info = new TranInfo(); 638 info.id = entry.getKey(); 639 for (Operation operation : entry.getValue()) { 640 info.track(operation); 641 } 642 infos.add(info); 643 } 644 } 645 } 646 synchronized (preparedTransactions) { 647 if (!preparedTransactions.isEmpty()) { 648 for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) { 649 TranInfo info = new TranInfo(); 650 info.id = entry.getKey(); 651 for (Operation operation : entry.getValue()) { 652 info.track(operation); 653 } 654 infos.add(info); 655 } 656 } 657 } 658 return infos.toString(); 659 } 660 661 /** 662 * Move all the messages that were in the journal into long term storage. We 663 * just replay and do a checkpoint. 664 * 665 * @throws IOException 666 * @throws IOException 667 * @throws IllegalStateException 668 */ 669 private void recover() throws IllegalStateException, IOException { 670 this.indexLock.writeLock().lock(); 671 try { 672 673 long start = System.currentTimeMillis(); 674 boolean requiresJournalReplay = recoverProducerAudit(); 675 requiresJournalReplay |= recoverAckMessageFileMap(); 676 Location lastIndoubtPosition = getRecoveryPosition(); 677 Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition; 678 if (recoveryPosition != null) { 679 int redoCounter = 0; 680 int dataFileRotationTracker = recoveryPosition.getDataFileId(); 681 LOG.info("Recovering from the journal @" + recoveryPosition); 682 while (recoveryPosition != null) { 683 try { 684 JournalCommand<?> message = load(recoveryPosition); 685 metadata.lastUpdate = recoveryPosition; 686 process(message, recoveryPosition, lastIndoubtPosition); 687 redoCounter++; 688 } catch (IOException failedRecovery) { 689 if (isIgnoreMissingJournalfiles()) { 690 LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); 691 // track this dud location 692 journal.corruptRecoveryLocation(recoveryPosition); 693 } else { 694 throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery); 695 } 696 } 697 recoveryPosition = journal.getNextLocation(recoveryPosition); 698 // hold on to the minimum number of open files during recovery 699 if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) { 700 dataFileRotationTracker = recoveryPosition.getDataFileId(); 701 journal.cleanup(); 702 } 703 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { 704 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered .."); 705 } 706 } 707 if (LOG.isInfoEnabled()) { 708 long end = System.currentTimeMillis(); 709 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); 710 } 711 } 712 713 // We may have to undo some index updates. 714 pageFile.tx().execute(new Transaction.Closure<IOException>() { 715 @Override 716 public void execute(Transaction tx) throws IOException { 717 recoverIndex(tx); 718 } 719 }); 720 721 // rollback any recovered inflight local transactions, and discard any inflight XA transactions. 722 Set<TransactionId> toRollback = new HashSet<>(); 723 Set<TransactionId> toDiscard = new HashSet<>(); 724 synchronized (inflightTransactions) { 725 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { 726 TransactionId id = it.next(); 727 if (id.isLocalTransaction()) { 728 toRollback.add(id); 729 } else { 730 toDiscard.add(id); 731 } 732 } 733 for (TransactionId tx: toRollback) { 734 if (LOG.isDebugEnabled()) { 735 LOG.debug("rolling back recovered indoubt local transaction " + tx); 736 } 737 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null); 738 } 739 for (TransactionId tx: toDiscard) { 740 if (LOG.isDebugEnabled()) { 741 LOG.debug("discarding recovered in-flight XA transaction " + tx); 742 } 743 inflightTransactions.remove(tx); 744 } 745 } 746 747 synchronized (preparedTransactions) { 748 for (TransactionId txId : preparedTransactions.keySet()) { 749 LOG.warn("Recovered prepared XA TX: [{}]", txId); 750 } 751 } 752 753 } finally { 754 this.indexLock.writeLock().unlock(); 755 } 756 } 757 758 @SuppressWarnings("unused") 759 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { 760 return TransactionIdConversion.convertToLocal(tx); 761 } 762 763 private Location minimum(Location x, 764 Location y) { 765 Location min = null; 766 if (x != null) { 767 min = x; 768 if (y != null) { 769 int compare = y.compareTo(x); 770 if (compare < 0) { 771 min = y; 772 } 773 } 774 } else { 775 min = y; 776 } 777 return min; 778 } 779 780 private boolean recoverProducerAudit() throws IOException { 781 boolean requiresReplay = true; 782 if (metadata.producerSequenceIdTrackerLocation != null) { 783 try { 784 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); 785 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); 786 int maxNumProducers = getMaxFailoverProducersToTrack(); 787 int maxAuditDepth = getFailoverProducersAuditDepth(); 788 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); 789 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); 790 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); 791 requiresReplay = false; 792 } catch (Exception e) { 793 LOG.warn("Cannot recover message audit", e); 794 } 795 } 796 // got no audit stored so got to recreate via replay from start of the journal 797 return requiresReplay; 798 } 799 800 @SuppressWarnings("unchecked") 801 private boolean recoverAckMessageFileMap() throws IOException { 802 boolean requiresReplay = true; 803 if (metadata.ackMessageFileMapLocation != null) { 804 try { 805 KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); 806 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); 807 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); 808 requiresReplay = false; 809 } catch (Exception e) { 810 LOG.warn("Cannot recover ackMessageFileMap", e); 811 } 812 } 813 // got no ackMessageFileMap stored so got to recreate via replay from start of the journal 814 return requiresReplay; 815 } 816 817 protected void recoverIndex(Transaction tx) throws IOException { 818 long start = System.currentTimeMillis(); 819 // It is possible index updates got applied before the journal updates.. 820 // in that case we need to removed references to messages that are not in the journal 821 final Location lastAppendLocation = journal.getLastAppendLocation(); 822 long undoCounter=0; 823 824 // Go through all the destinations to see if they have messages past the lastAppendLocation 825 for (String key : storedDestinations.keySet()) { 826 StoredDestination sd = storedDestinations.get(key); 827 828 final ArrayList<Long> matches = new ArrayList<>(); 829 // Find all the Locations that are >= than the last Append Location. 830 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) { 831 @Override 832 protected void matched(Location key, Long value) { 833 matches.add(value); 834 } 835 }); 836 837 for (Long sequenceId : matches) { 838 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 839 if (keys != null) { 840 sd.locationIndex.remove(tx, keys.location); 841 sd.messageIdIndex.remove(tx, keys.messageId); 842 metadata.producerSequenceIdTracker.rollback(keys.messageId); 843 undoCounter++; 844 decrementAndSubSizeToStoreStat(key, keys.location.getSize()); 845 // TODO: do we need to modify the ack positions for the pub sub case? 846 } 847 } 848 } 849 850 if (undoCounter > 0) { 851 // The rolledback operations are basically in flight journal writes. To avoid getting 852 // these the end user should do sync writes to the journal. 853 if (LOG.isInfoEnabled()) { 854 long end = System.currentTimeMillis(); 855 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 856 } 857 } 858 859 undoCounter = 0; 860 start = System.currentTimeMillis(); 861 862 // Lets be extra paranoid here and verify that all the datafiles being referenced 863 // by the indexes still exists. 864 865 final SequenceSet ss = new SequenceSet(); 866 for (StoredDestination sd : storedDestinations.values()) { 867 // Use a visitor to cut down the number of pages that we load 868 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 869 int last=-1; 870 871 @Override 872 public boolean isInterestedInKeysBetween(Location first, Location second) { 873 if( first==null ) { 874 return !ss.contains(0, second.getDataFileId()); 875 } else if( second==null ) { 876 return true; 877 } else { 878 return !ss.contains(first.getDataFileId(), second.getDataFileId()); 879 } 880 } 881 882 @Override 883 public void visit(List<Location> keys, List<Long> values) { 884 for (Location l : keys) { 885 int fileId = l.getDataFileId(); 886 if( last != fileId ) { 887 ss.add(fileId); 888 last = fileId; 889 } 890 } 891 } 892 893 }); 894 } 895 HashSet<Integer> missingJournalFiles = new HashSet<>(); 896 while (!ss.isEmpty()) { 897 missingJournalFiles.add((int) ss.removeFirst()); 898 } 899 900 for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) { 901 missingJournalFiles.add(entry.getKey()); 902 for (Integer i : entry.getValue()) { 903 missingJournalFiles.add(i); 904 } 905 } 906 907 missingJournalFiles.removeAll(journal.getFileMap().keySet()); 908 909 if (!missingJournalFiles.isEmpty()) { 910 LOG.warn("Some journal files are missing: " + missingJournalFiles); 911 } 912 913 ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<>(); 914 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<>(); 915 for (Integer missing : missingJournalFiles) { 916 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0))); 917 } 918 919 if (checkForCorruptJournalFiles) { 920 Collection<DataFile> dataFiles = journal.getFileMap().values(); 921 for (DataFile dataFile : dataFiles) { 922 int id = dataFile.getDataFileId(); 923 // eof to next file id 924 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0))); 925 Sequence seq = dataFile.getCorruptedBlocks().getHead(); 926 while (seq != null) { 927 BTreeVisitor.BetweenVisitor<Location, Long> visitor = 928 new BTreeVisitor.BetweenVisitor<>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)); 929 missingPredicates.add(visitor); 930 knownCorruption.add(visitor); 931 seq = seq.getNext(); 932 } 933 } 934 } 935 936 if (!missingPredicates.isEmpty()) { 937 for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) { 938 final StoredDestination sd = sdEntry.getValue(); 939 final LinkedHashMap<Long, Location> matches = new LinkedHashMap<>(); 940 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) { 941 @Override 942 protected void matched(Location key, Long value) { 943 matches.put(value, key); 944 } 945 }); 946 947 // If some message references are affected by the missing data files... 948 if (!matches.isEmpty()) { 949 950 // We either 'gracefully' recover dropping the missing messages or 951 // we error out. 952 if( ignoreMissingJournalfiles ) { 953 // Update the index to remove the references to the missing data 954 for (Long sequenceId : matches.keySet()) { 955 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 956 sd.locationIndex.remove(tx, keys.location); 957 sd.messageIdIndex.remove(tx, keys.messageId); 958 LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); 959 undoCounter++; 960 decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize()); 961 // TODO: do we need to modify the ack positions for the pub sub case? 962 } 963 } else { 964 LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches); 965 throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected."); 966 } 967 } 968 } 969 } 970 971 if (!ignoreMissingJournalfiles) { 972 if (!knownCorruption.isEmpty()) { 973 LOG.error("Detected corrupt journal files. " + knownCorruption); 974 throw new IOException("Detected corrupt journal files. " + knownCorruption); 975 } 976 977 if (!missingJournalFiles.isEmpty()) { 978 LOG.error("Detected missing journal files. " + missingJournalFiles); 979 throw new IOException("Detected missing journal files. " + missingJournalFiles); 980 } 981 } 982 983 if (undoCounter > 0) { 984 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 985 // should do sync writes to the journal. 986 if (LOG.isInfoEnabled()) { 987 long end = System.currentTimeMillis(); 988 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 989 } 990 } 991 } 992 993 private Location nextRecoveryPosition; 994 private Location lastRecoveryPosition; 995 996 public void incrementalRecover() throws IOException { 997 this.indexLock.writeLock().lock(); 998 try { 999 if( nextRecoveryPosition == null ) { 1000 if( lastRecoveryPosition==null ) { 1001 nextRecoveryPosition = getRecoveryPosition(); 1002 } else { 1003 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 1004 } 1005 } 1006 while (nextRecoveryPosition != null) { 1007 lastRecoveryPosition = nextRecoveryPosition; 1008 metadata.lastUpdate = lastRecoveryPosition; 1009 JournalCommand<?> message = load(lastRecoveryPosition); 1010 process(message, lastRecoveryPosition, (IndexAware) null); 1011 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 1012 } 1013 } finally { 1014 this.indexLock.writeLock().unlock(); 1015 } 1016 } 1017 1018 public Location getLastUpdatePosition() throws IOException { 1019 return metadata.lastUpdate; 1020 } 1021 1022 private Location getRecoveryPosition() throws IOException { 1023 1024 if (!this.forceRecoverIndex) { 1025 1026 // If we need to recover the transactions.. 1027 if (metadata.firstInProgressTransactionLocation != null) { 1028 return metadata.firstInProgressTransactionLocation; 1029 } 1030 1031 // Perhaps there were no transactions... 1032 if( metadata.lastUpdate!=null) { 1033 // Start replay at the record after the last one recorded in the index file. 1034 return getNextInitializedLocation(metadata.lastUpdate); 1035 } 1036 } 1037 // This loads the first position. 1038 return journal.getNextLocation(null); 1039 } 1040 1041 private Location getNextInitializedLocation(Location location) throws IOException { 1042 Location mayNotBeInitialized = journal.getNextLocation(location); 1043 if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) { 1044 // need to init size and type to skip 1045 return journal.getNextLocation(mayNotBeInitialized); 1046 } else { 1047 return mayNotBeInitialized; 1048 } 1049 } 1050 1051 protected void checkpointCleanup(final boolean cleanup) throws IOException { 1052 long start; 1053 this.indexLock.writeLock().lock(); 1054 try { 1055 start = System.currentTimeMillis(); 1056 if( !opened.get() ) { 1057 return; 1058 } 1059 } finally { 1060 this.indexLock.writeLock().unlock(); 1061 } 1062 checkpointUpdate(cleanup); 1063 long end = System.currentTimeMillis(); 1064 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1065 if (LOG.isInfoEnabled()) { 1066 LOG.info("Slow KahaDB access: cleanup took " + (end - start)); 1067 } 1068 } 1069 } 1070 1071 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { 1072 int size = data.serializedSizeFramed(); 1073 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 1074 os.writeByte(data.type().getNumber()); 1075 data.writeFramed(os); 1076 return os.toByteSequence(); 1077 } 1078 1079 // ///////////////////////////////////////////////////////////////// 1080 // Methods call by the broker to update and query the store. 1081 // ///////////////////////////////////////////////////////////////// 1082 public Location store(JournalCommand<?> data) throws IOException { 1083 return store(data, false, null,null); 1084 } 1085 1086 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException { 1087 return store(data, false, null, null, onJournalStoreComplete); 1088 } 1089 1090 public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException { 1091 return store(data, sync, before, after, null); 1092 } 1093 1094 /** 1095 * All updated are are funneled through this method. The updates are converted 1096 * to a JournalMessage which is logged to the journal and then the data from 1097 * the JournalMessage is used to update the index just like it would be done 1098 * during a recovery process. 1099 */ 1100 public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { 1101 try { 1102 ByteSequence sequence = toByteSequence(data); 1103 Location location; 1104 1105 checkpointLock.readLock().lock(); 1106 try { 1107 1108 long start = System.currentTimeMillis(); 1109 location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; 1110 long start2 = System.currentTimeMillis(); 1111 //Track the last async update so we know if we need to sync at the next checkpoint 1112 if (!sync && journal.isJournalDiskSyncPeriodic()) { 1113 lastAsyncJournalUpdate.set(location); 1114 } 1115 process(data, location, before); 1116 1117 long end = System.currentTimeMillis(); 1118 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1119 if (LOG.isInfoEnabled()) { 1120 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); 1121 } 1122 } 1123 } finally { 1124 checkpointLock.readLock().unlock(); 1125 } 1126 1127 if (after != null) { 1128 after.run(); 1129 } 1130 1131 if (scheduler == null && opened.get()) { 1132 startCheckpoint(); 1133 } 1134 return location; 1135 } catch (IOException ioe) { 1136 LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe); 1137 brokerService.handleIOException(ioe); 1138 throw ioe; 1139 } 1140 } 1141 1142 /** 1143 * Loads a previously stored JournalMessage 1144 * 1145 * @param location 1146 * @return 1147 * @throws IOException 1148 */ 1149 public JournalCommand<?> load(Location location) throws IOException { 1150 long start = System.currentTimeMillis(); 1151 ByteSequence data = journal.read(location); 1152 long end = System.currentTimeMillis(); 1153 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 1154 if (LOG.isInfoEnabled()) { 1155 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); 1156 } 1157 } 1158 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 1159 byte readByte = is.readByte(); 1160 KahaEntryType type = KahaEntryType.valueOf(readByte); 1161 if( type == null ) { 1162 try { 1163 is.close(); 1164 } catch (IOException e) {} 1165 throw new IOException("Could not load journal record, null type information from: " + readByte + " at location: "+location); 1166 } 1167 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 1168 message.mergeFramed(is); 1169 return message; 1170 } 1171 1172 /** 1173 * do minimal recovery till we reach the last inDoubtLocation 1174 * @param data 1175 * @param location 1176 * @param inDoubtlocation 1177 * @throws IOException 1178 */ 1179 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 1180 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 1181 process(data, location, (IndexAware) null); 1182 } else { 1183 // just recover producer audit 1184 data.visit(new Visitor() { 1185 @Override 1186 public void visit(KahaAddMessageCommand command) throws IOException { 1187 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1188 } 1189 }); 1190 } 1191 } 1192 1193 // ///////////////////////////////////////////////////////////////// 1194 // Journaled record processing methods. Once the record is journaled, 1195 // these methods handle applying the index updates. These may be called 1196 // from the recovery method too so they need to be idempotent 1197 // ///////////////////////////////////////////////////////////////// 1198 1199 void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException { 1200 data.visit(new Visitor() { 1201 @Override 1202 public void visit(KahaAddMessageCommand command) throws IOException { 1203 process(command, location, onSequenceAssignedCallback); 1204 } 1205 1206 @Override 1207 public void visit(KahaRemoveMessageCommand command) throws IOException { 1208 process(command, location); 1209 } 1210 1211 @Override 1212 public void visit(KahaPrepareCommand command) throws IOException { 1213 process(command, location); 1214 } 1215 1216 @Override 1217 public void visit(KahaCommitCommand command) throws IOException { 1218 process(command, location, onSequenceAssignedCallback); 1219 } 1220 1221 @Override 1222 public void visit(KahaRollbackCommand command) throws IOException { 1223 process(command, location); 1224 } 1225 1226 @Override 1227 public void visit(KahaRemoveDestinationCommand command) throws IOException { 1228 process(command, location); 1229 } 1230 1231 @Override 1232 public void visit(KahaSubscriptionCommand command) throws IOException { 1233 process(command, location); 1234 } 1235 1236 @Override 1237 public void visit(KahaProducerAuditCommand command) throws IOException { 1238 processLocation(location); 1239 } 1240 1241 @Override 1242 public void visit(KahaAckMessageFileMapCommand command) throws IOException { 1243 processLocation(location); 1244 } 1245 1246 @Override 1247 public void visit(KahaTraceCommand command) { 1248 processLocation(location); 1249 } 1250 1251 @Override 1252 public void visit(KahaUpdateMessageCommand command) throws IOException { 1253 process(command, location); 1254 } 1255 1256 @Override 1257 public void visit(KahaRewrittenDataFileCommand command) throws IOException { 1258 process(command, location); 1259 } 1260 }); 1261 } 1262 1263 @SuppressWarnings("rawtypes") 1264 protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { 1265 if (command.hasTransactionInfo()) { 1266 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1267 inflightTx.add(new AddOperation(command, location, runWithIndexLock)); 1268 } else { 1269 this.indexLock.writeLock().lock(); 1270 try { 1271 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1272 @Override 1273 public void execute(Transaction tx) throws IOException { 1274 long assignedIndex = updateIndex(tx, command, location); 1275 if (runWithIndexLock != null) { 1276 runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); 1277 } 1278 } 1279 }); 1280 1281 } finally { 1282 this.indexLock.writeLock().unlock(); 1283 } 1284 } 1285 } 1286 1287 protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { 1288 this.indexLock.writeLock().lock(); 1289 try { 1290 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1291 @Override 1292 public void execute(Transaction tx) throws IOException { 1293 updateIndex(tx, command, location); 1294 } 1295 }); 1296 } finally { 1297 this.indexLock.writeLock().unlock(); 1298 } 1299 } 1300 1301 @SuppressWarnings("rawtypes") 1302 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { 1303 if (command.hasTransactionInfo()) { 1304 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1305 inflightTx.add(new RemoveOperation(command, location)); 1306 } else { 1307 this.indexLock.writeLock().lock(); 1308 try { 1309 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1310 @Override 1311 public void execute(Transaction tx) throws IOException { 1312 updateIndex(tx, command, location); 1313 } 1314 }); 1315 } finally { 1316 this.indexLock.writeLock().unlock(); 1317 } 1318 } 1319 } 1320 1321 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { 1322 this.indexLock.writeLock().lock(); 1323 try { 1324 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1325 @Override 1326 public void execute(Transaction tx) throws IOException { 1327 updateIndex(tx, command, location); 1328 } 1329 }); 1330 } finally { 1331 this.indexLock.writeLock().unlock(); 1332 } 1333 } 1334 1335 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { 1336 this.indexLock.writeLock().lock(); 1337 try { 1338 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1339 @Override 1340 public void execute(Transaction tx) throws IOException { 1341 updateIndex(tx, command, location); 1342 } 1343 }); 1344 } finally { 1345 this.indexLock.writeLock().unlock(); 1346 } 1347 } 1348 1349 protected void processLocation(final Location location) { 1350 this.indexLock.writeLock().lock(); 1351 try { 1352 metadata.lastUpdate = location; 1353 } finally { 1354 this.indexLock.writeLock().unlock(); 1355 } 1356 } 1357 1358 @SuppressWarnings("rawtypes") 1359 protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { 1360 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1361 List<Operation> inflightTx; 1362 synchronized (inflightTransactions) { 1363 inflightTx = inflightTransactions.remove(key); 1364 if (inflightTx == null) { 1365 inflightTx = preparedTransactions.remove(key); 1366 } 1367 } 1368 if (inflightTx == null) { 1369 // only non persistent messages in this tx 1370 if (before != null) { 1371 before.sequenceAssignedWithIndexLocked(-1); 1372 } 1373 return; 1374 } 1375 1376 final List<Operation> messagingTx = inflightTx; 1377 indexLock.writeLock().lock(); 1378 try { 1379 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1380 @Override 1381 public void execute(Transaction tx) throws IOException { 1382 for (Operation op : messagingTx) { 1383 op.execute(tx); 1384 } 1385 } 1386 }); 1387 metadata.lastUpdate = location; 1388 } finally { 1389 indexLock.writeLock().unlock(); 1390 } 1391 } 1392 1393 @SuppressWarnings("rawtypes") 1394 protected void process(KahaPrepareCommand command, Location location) { 1395 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1396 synchronized (inflightTransactions) { 1397 List<Operation> tx = inflightTransactions.remove(key); 1398 if (tx != null) { 1399 preparedTransactions.put(key, tx); 1400 } 1401 } 1402 } 1403 1404 @SuppressWarnings("rawtypes") 1405 protected void process(KahaRollbackCommand command, Location location) throws IOException { 1406 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1407 List<Operation> updates = null; 1408 synchronized (inflightTransactions) { 1409 updates = inflightTransactions.remove(key); 1410 if (updates == null) { 1411 updates = preparedTransactions.remove(key); 1412 } 1413 } 1414 } 1415 1416 protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException { 1417 final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet()); 1418 1419 // Mark the current journal file as a compacted file so that gc checks can skip 1420 // over logs that are smaller compaction type logs. 1421 DataFile current = journal.getDataFileById(location.getDataFileId()); 1422 current.setTypeCode(command.getRewriteType()); 1423 1424 if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) { 1425 // Move offset so that next location read jumps to next file. 1426 location.setOffset(journalMaxFileLength); 1427 } 1428 } 1429 1430 // ///////////////////////////////////////////////////////////////// 1431 // These methods do the actual index updates. 1432 // ///////////////////////////////////////////////////////////////// 1433 1434 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 1435 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<>(); 1436 1437 long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { 1438 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1439 1440 // Skip adding the message to the index if this is a topic and there are 1441 // no subscriptions. 1442 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { 1443 return -1; 1444 } 1445 1446 // Add the message. 1447 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; 1448 long id = sd.orderIndex.getNextMessageId(); 1449 Long previous = sd.locationIndex.put(tx, location, id); 1450 if (previous == null) { 1451 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 1452 if (previous == null) { 1453 incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); 1454 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); 1455 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { 1456 addAckLocationForNewMessage(tx, command.getDestination(), sd, id); 1457 } 1458 metadata.lastUpdate = location; 1459 } else { 1460 1461 MessageKeys messageKeys = sd.orderIndex.get(tx, previous); 1462 if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { 1463 // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt 1464 LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); 1465 } 1466 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 1467 sd.locationIndex.remove(tx, location); 1468 id = -1; 1469 } 1470 } else { 1471 // restore the previous value.. Looks like this was a redo of a previously 1472 // added message. We don't want to assign it a new id as the other indexes would 1473 // be wrong.. 1474 sd.locationIndex.put(tx, location, previous); 1475 // ensure sequence is not broken 1476 sd.orderIndex.revertNextMessageId(); 1477 metadata.lastUpdate = location; 1478 } 1479 // record this id in any event, initial send or recovery 1480 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1481 1482 return id; 1483 } 1484 1485 void trackPendingAdd(KahaDestination destination, Long seq) { 1486 StoredDestination sd = storedDestinations.get(key(destination)); 1487 if (sd != null) { 1488 sd.trackPendingAdd(seq); 1489 } 1490 } 1491 1492 void trackPendingAddComplete(KahaDestination destination, Long seq) { 1493 StoredDestination sd = storedDestinations.get(key(destination)); 1494 if (sd != null) { 1495 sd.trackPendingAddComplete(seq); 1496 } 1497 } 1498 1499 void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException { 1500 KahaAddMessageCommand command = updateMessageCommand.getMessage(); 1501 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1502 1503 Long id = sd.messageIdIndex.get(tx, command.getMessageId()); 1504 if (id != null) { 1505 MessageKeys previousKeys = sd.orderIndex.put( 1506 tx, 1507 command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY, 1508 id, 1509 new MessageKeys(command.getMessageId(), location) 1510 ); 1511 sd.locationIndex.put(tx, location, id); 1512 incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); 1513 1514 if (previousKeys != null) { 1515 //Remove the existing from the size 1516 decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize()); 1517 1518 //update all the subscription metrics 1519 if (enableSubscriptionStatistics && sd.ackPositions != null && location.getSize() != previousKeys.location.getSize()) { 1520 Iterator<Entry<String, SequenceSet>> iter = sd.ackPositions.iterator(tx); 1521 while (iter.hasNext()) { 1522 Entry<String, SequenceSet> e = iter.next(); 1523 if (e.getValue().contains(id)) { 1524 incrementAndAddSizeToStoreStat(key(command.getDestination()), e.getKey(), location.getSize()); 1525 decrementAndSubSizeToStoreStat(key(command.getDestination()), e.getKey(), previousKeys.location.getSize()); 1526 } 1527 } 1528 } 1529 1530 // on first update previous is original location, on recovery/replay it may be the updated location 1531 if(!previousKeys.location.equals(location)) { 1532 sd.locationIndex.remove(tx, previousKeys.location); 1533 } 1534 } 1535 metadata.lastUpdate = location; 1536 } else { 1537 //Add the message if it can't be found 1538 this.updateIndex(tx, command, location); 1539 } 1540 } 1541 1542 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { 1543 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1544 if (!command.hasSubscriptionKey()) { 1545 1546 // In the queue case we just remove the message from the index.. 1547 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 1548 if (sequenceId != null) { 1549 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 1550 if (keys != null) { 1551 sd.locationIndex.remove(tx, keys.location); 1552 decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize()); 1553 recordAckMessageReferenceLocation(ackLocation, keys.location); 1554 metadata.lastUpdate = ackLocation; 1555 } else if (LOG.isDebugEnabled()) { 1556 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); 1557 } 1558 } else if (LOG.isDebugEnabled()) { 1559 LOG.debug("message not found in sequence id index: " + command.getMessageId()); 1560 } 1561 } else { 1562 // In the topic case we need remove the message once it's been acked 1563 // by all the subs 1564 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 1565 1566 // Make sure it's a valid message id... 1567 if (sequence != null) { 1568 String subscriptionKey = command.getSubscriptionKey(); 1569 if (command.getAck() != UNMATCHED) { 1570 sd.orderIndex.get(tx, sequence); 1571 byte priority = sd.orderIndex.lastGetPriority(); 1572 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); 1573 } 1574 1575 MessageKeys keys = sd.orderIndex.get(tx, sequence); 1576 if (keys != null) { 1577 recordAckMessageReferenceLocation(ackLocation, keys.location); 1578 } 1579 // The following method handles deleting un-referenced messages. 1580 removeAckLocation(command, tx, sd, subscriptionKey, sequence); 1581 metadata.lastUpdate = ackLocation; 1582 } else if (LOG.isDebugEnabled()) { 1583 LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); 1584 } 1585 1586 } 1587 } 1588 1589 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { 1590 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); 1591 if (referenceFileIds == null) { 1592 referenceFileIds = new HashSet<>(); 1593 referenceFileIds.add(messageLocation.getDataFileId()); 1594 metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); 1595 } else { 1596 Integer id = Integer.valueOf(messageLocation.getDataFileId()); 1597 if (!referenceFileIds.contains(id)) { 1598 referenceFileIds.add(id); 1599 } 1600 } 1601 } 1602 1603 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { 1604 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1605 sd.orderIndex.remove(tx); 1606 1607 sd.locationIndex.clear(tx); 1608 sd.locationIndex.unload(tx); 1609 tx.free(sd.locationIndex.getPageId()); 1610 1611 sd.messageIdIndex.clear(tx); 1612 sd.messageIdIndex.unload(tx); 1613 tx.free(sd.messageIdIndex.getPageId()); 1614 1615 if (sd.subscriptions != null) { 1616 sd.subscriptions.clear(tx); 1617 sd.subscriptions.unload(tx); 1618 tx.free(sd.subscriptions.getPageId()); 1619 1620 sd.subscriptionAcks.clear(tx); 1621 sd.subscriptionAcks.unload(tx); 1622 tx.free(sd.subscriptionAcks.getPageId()); 1623 1624 sd.ackPositions.clear(tx); 1625 sd.ackPositions.unload(tx); 1626 tx.free(sd.ackPositions.getHeadPageId()); 1627 1628 sd.subLocations.clear(tx); 1629 sd.subLocations.unload(tx); 1630 tx.free(sd.subLocations.getHeadPageId()); 1631 } 1632 1633 String key = key(command.getDestination()); 1634 storedDestinations.remove(key); 1635 metadata.destinations.remove(tx, key); 1636 clearStoreStats(command.getDestination()); 1637 storeCache.remove(key(command.getDestination())); 1638 } 1639 1640 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { 1641 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1642 final String subscriptionKey = command.getSubscriptionKey(); 1643 1644 // If set then we are creating it.. otherwise we are destroying the sub 1645 if (command.hasSubscriptionInfo()) { 1646 Location existing = sd.subLocations.get(tx, subscriptionKey); 1647 if (existing != null && existing.compareTo(location) == 0) { 1648 // replay on recovery, ignore 1649 LOG.trace("ignoring journal replay of replay of sub from: " + location); 1650 return; 1651 } 1652 1653 sd.subscriptions.put(tx, subscriptionKey, command); 1654 sd.subLocations.put(tx, subscriptionKey, location); 1655 long ackLocation=NOT_ACKED; 1656 if (!command.getRetroactive()) { 1657 ackLocation = sd.orderIndex.nextMessageId-1; 1658 } else { 1659 addAckLocationForRetroactiveSub(tx, sd, subscriptionKey); 1660 } 1661 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); 1662 sd.subscriptionCache.add(subscriptionKey); 1663 } else { 1664 // delete the sub... 1665 sd.subscriptions.remove(tx, subscriptionKey); 1666 sd.subLocations.remove(tx, subscriptionKey); 1667 sd.subscriptionAcks.remove(tx, subscriptionKey); 1668 sd.subscriptionCache.remove(subscriptionKey); 1669 removeAckLocationsForSub(command, tx, sd, subscriptionKey); 1670 MessageStoreSubscriptionStatistics subStats = getSubStats(key(command.getDestination())); 1671 if (subStats != null) { 1672 subStats.removeSubscription(subscriptionKey); 1673 } 1674 1675 if (sd.subscriptions.isEmpty(tx)) { 1676 // remove the stored destination 1677 KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand(); 1678 removeDestinationCommand.setDestination(command.getDestination()); 1679 updateIndex(tx, removeDestinationCommand, null); 1680 clearStoreStats(command.getDestination()); 1681 } 1682 } 1683 } 1684 1685 private void checkpointUpdate(final boolean cleanup) throws IOException { 1686 checkpointLock.writeLock().lock(); 1687 try { 1688 this.indexLock.writeLock().lock(); 1689 try { 1690 Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>, IOException>() { 1691 @Override 1692 public Set<Integer> execute(Transaction tx) throws IOException { 1693 return checkpointUpdate(tx, cleanup); 1694 } 1695 }); 1696 pageFile.flush(); 1697 // after the index update such that partial removal does not leave dangling references in the index. 1698 journal.removeDataFiles(filesToGc); 1699 } finally { 1700 this.indexLock.writeLock().unlock(); 1701 } 1702 1703 } finally { 1704 checkpointLock.writeLock().unlock(); 1705 } 1706 } 1707 1708 /** 1709 * @param tx 1710 * @throws IOException 1711 */ 1712 Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 1713 MDC.put("activemq.persistenceDir", getDirectory().getName()); 1714 LOG.debug("Checkpoint started."); 1715 1716 // reflect last update exclusive of current checkpoint 1717 Location lastUpdate = metadata.lastUpdate; 1718 1719 metadata.state = OPEN_STATE; 1720 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); 1721 metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap(); 1722 Location[] inProgressTxRange = getInProgressTxLocationRange(); 1723 metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; 1724 tx.store(metadata.page, metadataMarshaller, true); 1725 1726 final TreeSet<Integer> gcCandidateSet = new TreeSet<>(); 1727 if (cleanup) { 1728 1729 final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet()); 1730 gcCandidateSet.addAll(completeFileSet); 1731 1732 if (LOG.isTraceEnabled()) { 1733 LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); 1734 } 1735 1736 if (lastUpdate != null) { 1737 // we won't delete past the last update, ackCompaction journal can be a candidate in error 1738 gcCandidateSet.removeAll(new TreeSet<Integer>(gcCandidateSet.tailSet(lastUpdate.getDataFileId()))); 1739 } 1740 1741 // Don't GC files under replication 1742 if( journalFilesBeingReplicated!=null ) { 1743 gcCandidateSet.removeAll(journalFilesBeingReplicated); 1744 } 1745 1746 if (metadata.producerSequenceIdTrackerLocation != null) { 1747 int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId(); 1748 if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) { 1749 // rewrite so we don't prevent gc 1750 metadata.producerSequenceIdTracker.setModified(true); 1751 if (LOG.isTraceEnabled()) { 1752 LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation); 1753 } 1754 } 1755 gcCandidateSet.remove(dataFileId); 1756 if (LOG.isTraceEnabled()) { 1757 LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet); 1758 } 1759 } 1760 1761 if (metadata.ackMessageFileMapLocation != null) { 1762 int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); 1763 gcCandidateSet.remove(dataFileId); 1764 if (LOG.isTraceEnabled()) { 1765 LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet); 1766 } 1767 } 1768 1769 // Don't GC files referenced by in-progress tx 1770 if (inProgressTxRange[0] != null) { 1771 for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { 1772 gcCandidateSet.remove(pendingTx); 1773 } 1774 } 1775 if (LOG.isTraceEnabled()) { 1776 LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); 1777 } 1778 1779 // Go through all the destinations to see if any of them can remove GC candidates. 1780 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) { 1781 if( gcCandidateSet.isEmpty() ) { 1782 break; 1783 } 1784 1785 // Use a visitor to cut down the number of pages that we load 1786 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 1787 int last=-1; 1788 @Override 1789 public boolean isInterestedInKeysBetween(Location first, Location second) { 1790 if( first==null ) { 1791 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1); 1792 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1793 subset.remove(second.getDataFileId()); 1794 } 1795 return !subset.isEmpty(); 1796 } else if( second==null ) { 1797 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId()); 1798 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1799 subset.remove(first.getDataFileId()); 1800 } 1801 return !subset.isEmpty(); 1802 } else { 1803 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); 1804 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1805 subset.remove(first.getDataFileId()); 1806 } 1807 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1808 subset.remove(second.getDataFileId()); 1809 } 1810 return !subset.isEmpty(); 1811 } 1812 } 1813 1814 @Override 1815 public void visit(List<Location> keys, List<Long> values) { 1816 for (Location l : keys) { 1817 int fileId = l.getDataFileId(); 1818 if( last != fileId ) { 1819 gcCandidateSet.remove(fileId); 1820 last = fileId; 1821 } 1822 } 1823 } 1824 }); 1825 1826 // Durable Subscription 1827 if (entry.getValue().subLocations != null) { 1828 Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx); 1829 while (iter.hasNext()) { 1830 Entry<String, Location> subscription = iter.next(); 1831 int dataFileId = subscription.getValue().getDataFileId(); 1832 1833 // Move subscription along if it has no outstanding messages that need ack'd 1834 // and its in the last log file in the journal. 1835 if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) { 1836 final StoredDestination destination = entry.getValue(); 1837 final String subscriptionKey = subscription.getKey(); 1838 SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey); 1839 1840 // When pending is size one that is the next message Id meaning there 1841 // are no pending messages currently. 1842 if (pendingAcks == null || pendingAcks.isEmpty() || 1843 (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) { 1844 1845 if (LOG.isTraceEnabled()) { 1846 LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId); 1847 } 1848 1849 final KahaSubscriptionCommand kahaSub = 1850 destination.subscriptions.get(tx, subscriptionKey); 1851 destination.subLocations.put( 1852 tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub)); 1853 1854 // Skips the remove from candidates if we rewrote the subscription 1855 // in order to prevent duplicate subscription commands on recover. 1856 // If another subscription is on the same file and isn't rewritten 1857 // than it will remove the file from the set. 1858 continue; 1859 } 1860 } 1861 1862 gcCandidateSet.remove(dataFileId); 1863 } 1864 } 1865 1866 if (LOG.isTraceEnabled()) { 1867 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); 1868 } 1869 } 1870 1871 // check we are not deleting file with ack for in-use journal files 1872 if (LOG.isTraceEnabled()) { 1873 LOG.trace("gc candidates: " + gcCandidateSet); 1874 LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap); 1875 } 1876 1877 boolean ackMessageFileMapMod = false; 1878 Iterator<Integer> candidates = gcCandidateSet.iterator(); 1879 while (candidates.hasNext()) { 1880 Integer candidate = candidates.next(); 1881 Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate); 1882 if (referencedFileIds != null) { 1883 for (Integer referencedFileId : referencedFileIds) { 1884 if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) { 1885 // active file that is not targeted for deletion is referenced so don't delete 1886 candidates.remove(); 1887 break; 1888 } 1889 } 1890 if (gcCandidateSet.contains(candidate)) { 1891 ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null); 1892 } else { 1893 if (LOG.isTraceEnabled()) { 1894 LOG.trace("not removing data file: " + candidate 1895 + " as contained ack(s) refer to referenced file: " + referencedFileIds); 1896 } 1897 } 1898 } 1899 } 1900 1901 if (!gcCandidateSet.isEmpty()) { 1902 LOG.debug("Cleanup removing the data files: {}", gcCandidateSet); 1903 for (Integer candidate : gcCandidateSet) { 1904 for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) { 1905 ackMessageFileMapMod |= ackFiles.remove(candidate); 1906 } 1907 } 1908 if (ackMessageFileMapMod) { 1909 checkpointUpdate(tx, false); 1910 } 1911 } else if (isEnableAckCompaction()) { 1912 if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) { 1913 // First check length of journal to make sure it makes sense to even try. 1914 // 1915 // If there is only one journal file with Acks in it we don't need to move 1916 // it since it won't be chained to any later logs. 1917 // 1918 // If the logs haven't grown since the last time then we need to compact 1919 // otherwise there seems to still be room for growth and we don't need to incur 1920 // the overhead. Depending on configuration this check can be avoided and 1921 // Ack compaction will run any time the store has not GC'd a journal file in 1922 // the configured amount of cycles. 1923 if (metadata.ackMessageFileMap.size() > 1 && 1924 (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) { 1925 1926 LOG.trace("No files GC'd checking if threshold to ACK compaction has been met."); 1927 try { 1928 scheduler.execute(new AckCompactionRunner()); 1929 } catch (Exception ex) { 1930 LOG.warn("Error on queueing the Ack Compactor", ex); 1931 } 1932 } else { 1933 LOG.trace("Journal activity detected, no Ack compaction scheduled."); 1934 } 1935 1936 checkPointCyclesWithNoGC = 0; 1937 } else { 1938 LOG.trace("Not yet time to check for compaction: {} of {} cycles", 1939 checkPointCyclesWithNoGC, getCompactAcksAfterNoGC()); 1940 } 1941 1942 journalLogOnLastCompactionCheck = journal.getCurrentDataFileId(); 1943 } 1944 } 1945 MDC.remove("activemq.persistenceDir"); 1946 1947 LOG.debug("Checkpoint done."); 1948 return gcCandidateSet; 1949 } 1950 1951 private final class AckCompactionRunner implements Runnable { 1952 1953 @Override 1954 public void run() { 1955 1956 int journalToAdvance = -1; 1957 Set<Integer> journalLogsReferenced = new HashSet<>(); 1958 1959 //flag to know whether the ack forwarding completed without an exception 1960 boolean forwarded = false; 1961 1962 try { 1963 //acquire the checkpoint lock to prevent other threads from 1964 //running a checkpoint while this is running 1965 // 1966 //Normally this task runs on the same executor as the checkpoint task 1967 //so this ack compaction runner wouldn't run at the same time as the checkpoint task. 1968 // 1969 //However, there are two cases where this isn't always true. 1970 //First, the checkpoint() method is public and can be called through the 1971 //PersistenceAdapter interface by someone at the same time this is running. 1972 //Second, a checkpoint is called during shutdown without using the executor. 1973 // 1974 //In the future it might be better to just remove the checkpointLock entirely 1975 //and only use the executor but this would need to be examined for any unintended 1976 //consequences 1977 checkpointLock.readLock().lock(); 1978 1979 try { 1980 1981 // Lock index to capture the ackMessageFileMap data 1982 indexLock.writeLock().lock(); 1983 1984 // Map keys might not be sorted, find the earliest log file to forward acks 1985 // from and move only those, future cycles can chip away at more as needed. 1986 // We won't move files that are themselves rewritten on a previous compaction. 1987 List<Integer> journalFileIds = new ArrayList<>(metadata.ackMessageFileMap.keySet()); 1988 Collections.sort(journalFileIds); 1989 for (Integer journalFileId : journalFileIds) { 1990 DataFile current = journal.getDataFileById(journalFileId); 1991 if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { 1992 journalToAdvance = journalFileId; 1993 break; 1994 } 1995 } 1996 1997 // Check if we found one, or if we only found the current file being written to. 1998 if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { 1999 return; 2000 } 2001 2002 journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); 2003 2004 } finally { 2005 indexLock.writeLock().unlock(); 2006 } 2007 2008 try { 2009 // Background rewrite of the old acks 2010 forwardAllAcks(journalToAdvance, journalLogsReferenced); 2011 forwarded = true; 2012 } catch (IOException ioe) { 2013 LOG.error("Forwarding of acks failed", ioe); 2014 brokerService.handleIOException(ioe); 2015 } catch (Throwable e) { 2016 LOG.error("Forwarding of acks failed", e); 2017 brokerService.handleIOException(IOExceptionSupport.create(e)); 2018 } 2019 } finally { 2020 checkpointLock.readLock().unlock(); 2021 } 2022 2023 try { 2024 if (forwarded) { 2025 // Checkpoint with changes from the ackMessageFileMap 2026 checkpointUpdate(false); 2027 } 2028 } catch (IOException ioe) { 2029 LOG.error("Checkpoint failed", ioe); 2030 brokerService.handleIOException(ioe); 2031 } catch (Throwable e) { 2032 LOG.error("Checkpoint failed", e); 2033 brokerService.handleIOException(IOExceptionSupport.create(e)); 2034 } 2035 } 2036 } 2037 2038 private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException { 2039 LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead); 2040 2041 DataFile forwardsFile = journal.reserveDataFile(); 2042 forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE); 2043 LOG.trace("Reserved file for forwarded acks: {}", forwardsFile); 2044 2045 Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<>(); 2046 2047 try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) { 2048 KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand(); 2049 compactionMarker.setSourceDataFileId(journalToRead); 2050 compactionMarker.setRewriteType(forwardsFile.getTypeCode()); 2051 2052 ByteSequence payload = toByteSequence(compactionMarker); 2053 appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); 2054 LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead); 2055 2056 final Location limit = new Location(journalToRead + 1, 0); 2057 Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0), limit); 2058 while (nextLocation != null) { 2059 JournalCommand<?> command = null; 2060 try { 2061 command = load(nextLocation); 2062 } catch (IOException ex) { 2063 LOG.trace("Error loading command during ack forward: {}", nextLocation); 2064 } 2065 2066 if (command != null && command instanceof KahaRemoveMessageCommand) { 2067 payload = toByteSequence(command); 2068 Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); 2069 updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced); 2070 } 2071 2072 nextLocation = getNextLocationForAckForward(nextLocation, limit); 2073 } 2074 } 2075 2076 LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations); 2077 2078 // Lock index while we update the ackMessageFileMap. 2079 indexLock.writeLock().lock(); 2080 2081 // Update the ack map with the new locations of the acks 2082 for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) { 2083 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); 2084 if (referenceFileIds == null) { 2085 referenceFileIds = new HashSet<>(); 2086 referenceFileIds.addAll(entry.getValue()); 2087 metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); 2088 } else { 2089 referenceFileIds.addAll(entry.getValue()); 2090 } 2091 } 2092 2093 // remove the old location data from the ack map so that the old journal log file can 2094 // be removed on next GC. 2095 metadata.ackMessageFileMap.remove(journalToRead); 2096 2097 indexLock.writeLock().unlock(); 2098 2099 LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); 2100 } 2101 2102 private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) { 2103 //getNextLocation() can throw an IOException, we should handle it and set 2104 //nextLocation to null and abort gracefully 2105 //Should not happen in the normal case 2106 Location location = null; 2107 try { 2108 location = journal.getNextLocation(nextLocation, limit); 2109 } catch (IOException e) { 2110 LOG.warn("Failed to load next journal location after: {}, reason: {}", nextLocation, e); 2111 if (LOG.isDebugEnabled()) { 2112 LOG.debug("Failed to load next journal location after: {}", nextLocation, e); 2113 } 2114 } 2115 return location; 2116 } 2117 2118 final Runnable nullCompletionCallback = new Runnable() { 2119 @Override 2120 public void run() { 2121 } 2122 }; 2123 2124 private Location checkpointProducerAudit() throws IOException { 2125 if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { 2126 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2127 ObjectOutputStream oout = new ObjectOutputStream(baos); 2128 oout.writeObject(metadata.producerSequenceIdTracker); 2129 oout.flush(); 2130 oout.close(); 2131 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2132 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); 2133 try { 2134 location.getLatch().await(); 2135 if (location.getBatch().exception.get() != null) { 2136 throw location.getBatch().exception.get(); 2137 } 2138 } catch (InterruptedException e) { 2139 throw new InterruptedIOException(e.toString()); 2140 } 2141 return location; 2142 } 2143 return metadata.producerSequenceIdTrackerLocation; 2144 } 2145 2146 private Location checkpointAckMessageFileMap() throws IOException { 2147 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2148 ObjectOutputStream oout = new ObjectOutputStream(baos); 2149 oout.writeObject(metadata.ackMessageFileMap); 2150 oout.flush(); 2151 oout.close(); 2152 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2153 Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback); 2154 try { 2155 location.getLatch().await(); 2156 } catch (InterruptedException e) { 2157 throw new InterruptedIOException(e.toString()); 2158 } 2159 return location; 2160 } 2161 2162 private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException { 2163 2164 ByteSequence sequence = toByteSequence(subscription); 2165 Location location = journal.write(sequence, nullCompletionCallback) ; 2166 2167 try { 2168 location.getLatch().await(); 2169 } catch (InterruptedException e) { 2170 throw new InterruptedIOException(e.toString()); 2171 } 2172 return location; 2173 } 2174 2175 public HashSet<Integer> getJournalFilesBeingReplicated() { 2176 return journalFilesBeingReplicated; 2177 } 2178 2179 // ///////////////////////////////////////////////////////////////// 2180 // StoredDestination related implementation methods. 2181 // ///////////////////////////////////////////////////////////////// 2182 2183 protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<>(); 2184 2185 static class MessageKeys { 2186 final String messageId; 2187 final Location location; 2188 2189 public MessageKeys(String messageId, Location location) { 2190 this.messageId=messageId; 2191 this.location=location; 2192 } 2193 2194 @Override 2195 public String toString() { 2196 return "["+messageId+","+location+"]"; 2197 } 2198 } 2199 2200 protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> { 2201 final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller(); 2202 2203 @Override 2204 public MessageKeys readPayload(DataInput dataIn) throws IOException { 2205 return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn)); 2206 } 2207 2208 @Override 2209 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { 2210 dataOut.writeUTF(object.messageId); 2211 locationSizeMarshaller.writePayload(object.location, dataOut); 2212 } 2213 } 2214 2215 class LastAck { 2216 long lastAckedSequence; 2217 byte priority; 2218 2219 public LastAck(LastAck source) { 2220 this.lastAckedSequence = source.lastAckedSequence; 2221 this.priority = source.priority; 2222 } 2223 2224 public LastAck() { 2225 this.priority = MessageOrderIndex.HI; 2226 } 2227 2228 public LastAck(long ackLocation) { 2229 this.lastAckedSequence = ackLocation; 2230 this.priority = MessageOrderIndex.LO; 2231 } 2232 2233 public LastAck(long ackLocation, byte priority) { 2234 this.lastAckedSequence = ackLocation; 2235 this.priority = priority; 2236 } 2237 2238 @Override 2239 public String toString() { 2240 return "[" + lastAckedSequence + ":" + priority + "]"; 2241 } 2242 } 2243 2244 protected class LastAckMarshaller implements Marshaller<LastAck> { 2245 2246 @Override 2247 public void writePayload(LastAck object, DataOutput dataOut) throws IOException { 2248 dataOut.writeLong(object.lastAckedSequence); 2249 dataOut.writeByte(object.priority); 2250 } 2251 2252 @Override 2253 public LastAck readPayload(DataInput dataIn) throws IOException { 2254 LastAck lastAcked = new LastAck(); 2255 lastAcked.lastAckedSequence = dataIn.readLong(); 2256 if (metadata.version >= 3) { 2257 lastAcked.priority = dataIn.readByte(); 2258 } 2259 return lastAcked; 2260 } 2261 2262 @Override 2263 public int getFixedSize() { 2264 return 9; 2265 } 2266 2267 @Override 2268 public LastAck deepCopy(LastAck source) { 2269 return new LastAck(source); 2270 } 2271 2272 @Override 2273 public boolean isDeepCopySupported() { 2274 return true; 2275 } 2276 } 2277 2278 class StoredDestination { 2279 2280 MessageOrderIndex orderIndex = new MessageOrderIndex(); 2281 BTreeIndex<Location, Long> locationIndex; 2282 BTreeIndex<String, Long> messageIdIndex; 2283 2284 // These bits are only set for Topics 2285 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 2286 BTreeIndex<String, LastAck> subscriptionAcks; 2287 HashMap<String, MessageOrderCursor> subscriptionCursors; 2288 ListIndex<String, SequenceSet> ackPositions; 2289 ListIndex<String, Location> subLocations; 2290 2291 // Transient data used to track which Messages are no longer needed. 2292 final TreeMap<Long, Long> messageReferences = new TreeMap<>(); 2293 final HashSet<String> subscriptionCache = new LinkedHashSet<>(); 2294 2295 public void trackPendingAdd(Long seq) { 2296 orderIndex.trackPendingAdd(seq); 2297 } 2298 2299 public void trackPendingAddComplete(Long seq) { 2300 orderIndex.trackPendingAddComplete(seq); 2301 } 2302 2303 @Override 2304 public String toString() { 2305 return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size(); 2306 } 2307 } 2308 2309 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 2310 2311 final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); 2312 2313 @Override 2314 public StoredDestination readPayload(final DataInput dataIn) throws IOException { 2315 final StoredDestination value = new StoredDestination(); 2316 value.orderIndex.defaultPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); 2317 value.locationIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); 2318 value.messageIdIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); 2319 2320 if (dataIn.readBoolean()) { 2321 value.subscriptions = new BTreeIndex<>(pageFile, dataIn.readLong()); 2322 value.subscriptionAcks = new BTreeIndex<>(pageFile, dataIn.readLong()); 2323 if (metadata.version >= 4) { 2324 value.ackPositions = new ListIndex<>(pageFile, dataIn.readLong()); 2325 } else { 2326 // upgrade 2327 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2328 @Override 2329 public void execute(Transaction tx) throws IOException { 2330 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<>(); 2331 2332 if (metadata.version >= 3) { 2333 // migrate 2334 BTreeIndex<Long, HashSet<String>> oldAckPositions = 2335 new BTreeIndex<>(pageFile, dataIn.readLong()); 2336 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); 2337 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); 2338 oldAckPositions.load(tx); 2339 2340 2341 // Do the initial build of the data in memory before writing into the store 2342 // based Ack Positions List to avoid a lot of disk thrashing. 2343 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx); 2344 while (iterator.hasNext()) { 2345 Entry<Long, HashSet<String>> entry = iterator.next(); 2346 2347 for(String subKey : entry.getValue()) { 2348 SequenceSet pendingAcks = temp.get(subKey); 2349 if (pendingAcks == null) { 2350 pendingAcks = new SequenceSet(); 2351 temp.put(subKey, pendingAcks); 2352 } 2353 2354 pendingAcks.add(entry.getKey()); 2355 } 2356 } 2357 } 2358 // Now move the pending messages to ack data into the store backed 2359 // structure. 2360 value.ackPositions = new ListIndex<>(pageFile, tx.allocate()); 2361 value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2362 value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2363 value.ackPositions.load(tx); 2364 for(String subscriptionKey : temp.keySet()) { 2365 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); 2366 } 2367 2368 } 2369 }); 2370 } 2371 2372 if (metadata.version >= 5) { 2373 value.subLocations = new ListIndex<>(pageFile, dataIn.readLong()); 2374 } else { 2375 // upgrade 2376 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2377 @Override 2378 public void execute(Transaction tx) throws IOException { 2379 value.subLocations = new ListIndex<>(pageFile, tx.allocate()); 2380 value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2381 value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2382 value.subLocations.load(tx); 2383 } 2384 }); 2385 } 2386 } 2387 if (metadata.version >= 2) { 2388 value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); 2389 value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); 2390 } else { 2391 // upgrade 2392 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2393 @Override 2394 public void execute(Transaction tx) throws IOException { 2395 value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); 2396 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2397 value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); 2398 value.orderIndex.lowPriorityIndex.load(tx); 2399 2400 value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); 2401 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2402 value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller); 2403 value.orderIndex.highPriorityIndex.load(tx); 2404 } 2405 }); 2406 } 2407 2408 return value; 2409 } 2410 2411 @Override 2412 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 2413 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId()); 2414 dataOut.writeLong(value.locationIndex.getPageId()); 2415 dataOut.writeLong(value.messageIdIndex.getPageId()); 2416 if (value.subscriptions != null) { 2417 dataOut.writeBoolean(true); 2418 dataOut.writeLong(value.subscriptions.getPageId()); 2419 dataOut.writeLong(value.subscriptionAcks.getPageId()); 2420 dataOut.writeLong(value.ackPositions.getHeadPageId()); 2421 dataOut.writeLong(value.subLocations.getHeadPageId()); 2422 } else { 2423 dataOut.writeBoolean(false); 2424 } 2425 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); 2426 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); 2427 } 2428 } 2429 2430 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 2431 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 2432 2433 @Override 2434 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 2435 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 2436 rc.mergeFramed((InputStream)dataIn); 2437 return rc; 2438 } 2439 2440 @Override 2441 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 2442 object.writeFramed((OutputStream)dataOut); 2443 } 2444 } 2445 2446 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2447 String key = key(destination); 2448 StoredDestination rc = storedDestinations.get(key); 2449 if (rc == null) { 2450 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 2451 rc = loadStoredDestination(tx, key, topic); 2452 // Cache it. We may want to remove/unload destinations from the 2453 // cache that are not used for a while 2454 // to reduce memory usage. 2455 storedDestinations.put(key, rc); 2456 } 2457 return rc; 2458 } 2459 2460 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2461 String key = key(destination); 2462 StoredDestination rc = storedDestinations.get(key); 2463 if (rc == null && metadata.destinations.containsKey(tx, key)) { 2464 rc = getStoredDestination(destination, tx); 2465 } 2466 return rc; 2467 } 2468 2469 /** 2470 * @param tx 2471 * @param key 2472 * @param topic 2473 * @return 2474 * @throws IOException 2475 */ 2476 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 2477 // Try to load the existing indexes.. 2478 StoredDestination rc = metadata.destinations.get(tx, key); 2479 if (rc == null) { 2480 // Brand new destination.. allocate indexes for it. 2481 rc = new StoredDestination(); 2482 rc.orderIndex.allocate(tx); 2483 rc.locationIndex = new BTreeIndex<>(pageFile, tx.allocate()); 2484 rc.messageIdIndex = new BTreeIndex<>(pageFile, tx.allocate()); 2485 2486 if (topic) { 2487 rc.subscriptions = new BTreeIndex<>(pageFile, tx.allocate()); 2488 rc.subscriptionAcks = new BTreeIndex<>(pageFile, tx.allocate()); 2489 rc.ackPositions = new ListIndex<>(pageFile, tx.allocate()); 2490 rc.subLocations = new ListIndex<>(pageFile, tx.allocate()); 2491 } 2492 metadata.destinations.put(tx, key, rc); 2493 } 2494 2495 // Configure the marshalers and load. 2496 rc.orderIndex.load(tx); 2497 2498 // Figure out the next key using the last entry in the destination. 2499 rc.orderIndex.configureLast(tx); 2500 2501 rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller()); 2502 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2503 rc.locationIndex.load(tx); 2504 2505 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 2506 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2507 rc.messageIdIndex.load(tx); 2508 2509 //go through an upgrade old index if older than version 6 2510 if (metadata.version < 6) { 2511 for (Iterator<Entry<Location, Long>> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) { 2512 Entry<Location, Long> entry = iterator.next(); 2513 // modify so it is upgraded 2514 rc.locationIndex.put(tx, entry.getKey(), entry.getValue()); 2515 } 2516 //upgrade the order index 2517 for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) { 2518 Entry<Long, MessageKeys> entry = iterator.next(); 2519 //call get so that the last priority is updated 2520 rc.orderIndex.get(tx, entry.getKey()); 2521 rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue()); 2522 } 2523 } 2524 2525 // If it was a topic... 2526 if (topic) { 2527 2528 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 2529 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 2530 rc.subscriptions.load(tx); 2531 2532 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 2533 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); 2534 rc.subscriptionAcks.load(tx); 2535 2536 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2537 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2538 rc.ackPositions.load(tx); 2539 2540 rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2541 rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2542 rc.subLocations.load(tx); 2543 2544 rc.subscriptionCursors = new HashMap<>(); 2545 2546 if (metadata.version < 3) { 2547 2548 // on upgrade need to fill ackLocation with available messages past last ack 2549 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2550 Entry<String, LastAck> entry = iterator.next(); 2551 for (Iterator<Entry<Long, MessageKeys>> orderIterator = 2552 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { 2553 Long sequence = orderIterator.next().getKey(); 2554 addAckLocation(tx, rc, sequence, entry.getKey()); 2555 } 2556 // modify so it is upgraded 2557 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue()); 2558 } 2559 } 2560 2561 // Configure the message references index 2562 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx); 2563 while (subscriptions.hasNext()) { 2564 Entry<String, SequenceSet> subscription = subscriptions.next(); 2565 SequenceSet pendingAcks = subscription.getValue(); 2566 if (pendingAcks != null && !pendingAcks.isEmpty()) { 2567 Long lastPendingAck = pendingAcks.getTail().getLast(); 2568 for (Long sequenceId : pendingAcks) { 2569 Long current = rc.messageReferences.get(sequenceId); 2570 if (current == null) { 2571 current = new Long(0); 2572 } 2573 2574 // We always add a trailing empty entry for the next position to start from 2575 // so we need to ensure we don't count that as a message reference on reload. 2576 if (!sequenceId.equals(lastPendingAck)) { 2577 current = current.longValue() + 1; 2578 } else { 2579 current = Long.valueOf(0L); 2580 } 2581 2582 rc.messageReferences.put(sequenceId, current); 2583 } 2584 } 2585 } 2586 2587 // Configure the subscription cache 2588 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2589 Entry<String, LastAck> entry = iterator.next(); 2590 rc.subscriptionCache.add(entry.getKey()); 2591 } 2592 2593 if (rc.orderIndex.nextMessageId == 0) { 2594 // check for existing durable sub all acked out - pull next seq from acks as messages are gone 2595 if (!rc.subscriptionAcks.isEmpty(tx)) { 2596 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 2597 Entry<String, LastAck> entry = iterator.next(); 2598 rc.orderIndex.nextMessageId = 2599 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); 2600 } 2601 } 2602 } else { 2603 // update based on ackPositions for unmatched, last entry is always the next 2604 if (!rc.messageReferences.isEmpty()) { 2605 Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1]; 2606 rc.orderIndex.nextMessageId = 2607 Math.max(rc.orderIndex.nextMessageId, nextMessageId); 2608 } 2609 } 2610 } 2611 2612 if (metadata.version < VERSION) { 2613 // store again after upgrade 2614 metadata.destinations.put(tx, key, rc); 2615 } 2616 return rc; 2617 } 2618 2619 /** 2620 * Clear the counter for the destination, if one exists. 2621 * 2622 * @param kahaDestination 2623 */ 2624 protected void clearStoreStats(KahaDestination kahaDestination) { 2625 String key = key(kahaDestination); 2626 MessageStoreStatistics storeStats = getStoreStats(key); 2627 MessageStoreSubscriptionStatistics subStats = getSubStats(key); 2628 if (storeStats != null) { 2629 storeStats.reset(); 2630 } 2631 if (subStats != null) { 2632 subStats.reset(); 2633 } 2634 } 2635 2636 /** 2637 * Update MessageStoreStatistics 2638 * 2639 * @param kahaDestination 2640 * @param size 2641 */ 2642 protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) { 2643 incrementAndAddSizeToStoreStat(key(kahaDestination), size); 2644 } 2645 2646 protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) { 2647 MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); 2648 if (storeStats != null) { 2649 storeStats.getMessageCount().increment(); 2650 if (size > 0) { 2651 storeStats.getMessageSize().addSize(size); 2652 } 2653 } 2654 } 2655 2656 protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) { 2657 decrementAndSubSizeToStoreStat(key(kahaDestination), size); 2658 } 2659 2660 protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) { 2661 MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); 2662 if (storeStats != null) { 2663 storeStats.getMessageCount().decrement(); 2664 if (size > 0) { 2665 storeStats.getMessageSize().addSize(-size); 2666 } 2667 } 2668 } 2669 2670 protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) { 2671 incrementAndAddSizeToStoreStat(key(kahaDestination), subKey, size); 2672 } 2673 2674 protected void incrementAndAddSizeToStoreStat(String kahaDestKey, String subKey, long size) { 2675 if (enableSubscriptionStatistics) { 2676 MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey); 2677 if (subStats != null && subKey != null) { 2678 subStats.getMessageCount(subKey).increment(); 2679 if (size > 0) { 2680 subStats.getMessageSize(subKey).addSize(size); 2681 } 2682 } 2683 } 2684 } 2685 2686 2687 protected void decrementAndSubSizeToStoreStat(String kahaDestKey, String subKey, long size) { 2688 if (enableSubscriptionStatistics) { 2689 MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey); 2690 if (subStats != null && subKey != null) { 2691 subStats.getMessageCount(subKey).decrement(); 2692 if (size > 0) { 2693 subStats.getMessageSize(subKey).addSize(-size); 2694 } 2695 } 2696 } 2697 } 2698 2699 protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) { 2700 decrementAndSubSizeToStoreStat(key(kahaDestination), subKey, size); 2701 } 2702 2703 /** 2704 * This is a map to cache MessageStores for a specific 2705 * KahaDestination key 2706 */ 2707 protected final ConcurrentMap<String, MessageStore> storeCache = 2708 new ConcurrentHashMap<>(); 2709 2710 /** 2711 * Locate the storeMessageSize counter for this KahaDestination 2712 */ 2713 protected MessageStoreStatistics getStoreStats(String kahaDestKey) { 2714 MessageStoreStatistics storeStats = null; 2715 try { 2716 MessageStore messageStore = storeCache.get(kahaDestKey); 2717 if (messageStore != null) { 2718 storeStats = messageStore.getMessageStoreStatistics(); 2719 } 2720 } catch (Exception e1) { 2721 LOG.error("Getting size counter of destination failed", e1); 2722 } 2723 2724 return storeStats; 2725 } 2726 2727 protected MessageStoreSubscriptionStatistics getSubStats(String kahaDestKey) { 2728 MessageStoreSubscriptionStatistics subStats = null; 2729 try { 2730 MessageStore messageStore = storeCache.get(kahaDestKey); 2731 if (messageStore instanceof TopicMessageStore) { 2732 subStats = ((TopicMessageStore)messageStore).getMessageStoreSubStatistics(); 2733 } 2734 } catch (Exception e1) { 2735 LOG.error("Getting size counter of destination failed", e1); 2736 } 2737 2738 return subStats; 2739 } 2740 2741 /** 2742 * Determine whether this Destination matches the DestinationType 2743 * 2744 * @param destination 2745 * @param type 2746 * @return 2747 */ 2748 protected boolean matchType(Destination destination, 2749 KahaDestination.DestinationType type) { 2750 if (destination instanceof Topic 2751 && type.equals(KahaDestination.DestinationType.TOPIC)) { 2752 return true; 2753 } else if (destination instanceof Queue 2754 && type.equals(KahaDestination.DestinationType.QUEUE)) { 2755 return true; 2756 } 2757 return false; 2758 } 2759 2760 class LocationSizeMarshaller implements Marshaller<Location> { 2761 2762 public LocationSizeMarshaller() { 2763 2764 } 2765 2766 @Override 2767 public Location readPayload(DataInput dataIn) throws IOException { 2768 Location rc = new Location(); 2769 rc.setDataFileId(dataIn.readInt()); 2770 rc.setOffset(dataIn.readInt()); 2771 if (metadata.version >= 6) { 2772 rc.setSize(dataIn.readInt()); 2773 } 2774 return rc; 2775 } 2776 2777 @Override 2778 public void writePayload(Location object, DataOutput dataOut) 2779 throws IOException { 2780 dataOut.writeInt(object.getDataFileId()); 2781 dataOut.writeInt(object.getOffset()); 2782 dataOut.writeInt(object.getSize()); 2783 } 2784 2785 @Override 2786 public int getFixedSize() { 2787 return 12; 2788 } 2789 2790 @Override 2791 public Location deepCopy(Location source) { 2792 return new Location(source); 2793 } 2794 2795 @Override 2796 public boolean isDeepCopySupported() { 2797 return true; 2798 } 2799 } 2800 2801 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 2802 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2803 if (sequences == null) { 2804 sequences = new SequenceSet(); 2805 sequences.add(messageSequence); 2806 sd.ackPositions.add(tx, subscriptionKey, sequences); 2807 } else { 2808 sequences.add(messageSequence); 2809 sd.ackPositions.put(tx, subscriptionKey, sequences); 2810 } 2811 2812 Long count = sd.messageReferences.get(messageSequence); 2813 if (count == null) { 2814 count = Long.valueOf(0L); 2815 } 2816 count = count.longValue() + 1; 2817 sd.messageReferences.put(messageSequence, count); 2818 } 2819 2820 // new sub is interested in potentially all existing messages 2821 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2822 SequenceSet allOutstanding = new SequenceSet(); 2823 Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx); 2824 while (iterator.hasNext()) { 2825 SequenceSet set = iterator.next().getValue(); 2826 for (Long entry : set) { 2827 allOutstanding.add(entry); 2828 } 2829 } 2830 sd.ackPositions.put(tx, subscriptionKey, allOutstanding); 2831 2832 for (Long ackPosition : allOutstanding) { 2833 Long count = sd.messageReferences.get(ackPosition); 2834 2835 // There might not be a reference if the ackLocation was the last 2836 // one which is a placeholder for the next incoming message and 2837 // no value was added to the message references table. 2838 if (count != null) { 2839 count = count.longValue() + 1; 2840 sd.messageReferences.put(ackPosition, count); 2841 } 2842 } 2843 } 2844 2845 // on a new message add, all existing subs are interested in this message 2846 private void addAckLocationForNewMessage(Transaction tx, KahaDestination kahaDest, 2847 StoredDestination sd, Long messageSequence) throws IOException { 2848 for(String subscriptionKey : sd.subscriptionCache) { 2849 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2850 if (sequences == null) { 2851 sequences = new SequenceSet(); 2852 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2853 sd.ackPositions.add(tx, subscriptionKey, sequences); 2854 } else { 2855 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2856 sd.ackPositions.put(tx, subscriptionKey, sequences); 2857 } 2858 2859 MessageKeys key = sd.orderIndex.get(tx, messageSequence); 2860 incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey, 2861 key.location.getSize()); 2862 2863 Long count = sd.messageReferences.get(messageSequence); 2864 if (count == null) { 2865 count = Long.valueOf(0L); 2866 } 2867 count = count.longValue() + 1; 2868 sd.messageReferences.put(messageSequence, count); 2869 sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L)); 2870 } 2871 } 2872 2873 private void removeAckLocationsForSub(KahaSubscriptionCommand command, 2874 Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2875 if (!sd.ackPositions.isEmpty(tx)) { 2876 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey); 2877 if (sequences == null || sequences.isEmpty()) { 2878 return; 2879 } 2880 2881 ArrayList<Long> unreferenced = new ArrayList<>(); 2882 2883 for(Long sequenceId : sequences) { 2884 Long references = sd.messageReferences.get(sequenceId); 2885 if (references != null) { 2886 references = references.longValue() - 1; 2887 2888 if (references.longValue() > 0) { 2889 sd.messageReferences.put(sequenceId, references); 2890 } else { 2891 sd.messageReferences.remove(sequenceId); 2892 unreferenced.add(sequenceId); 2893 } 2894 } 2895 } 2896 2897 for(Long sequenceId : unreferenced) { 2898 // Find all the entries that need to get deleted. 2899 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>(); 2900 sd.orderIndex.getDeleteList(tx, deletes, sequenceId); 2901 2902 // Do the actual deletes. 2903 for (Entry<Long, MessageKeys> entry : deletes) { 2904 sd.locationIndex.remove(tx, entry.getValue().location); 2905 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2906 sd.orderIndex.remove(tx, entry.getKey()); 2907 decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize()); 2908 } 2909 } 2910 } 2911 } 2912 2913 /** 2914 * @param tx 2915 * @param sd 2916 * @param subscriptionKey 2917 * @param messageSequence 2918 * @throws IOException 2919 */ 2920 private void removeAckLocation(KahaRemoveMessageCommand command, 2921 Transaction tx, StoredDestination sd, String subscriptionKey, 2922 Long messageSequence) throws IOException { 2923 // Remove the sub from the previous location set.. 2924 if (messageSequence != null) { 2925 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); 2926 if (range != null && !range.isEmpty()) { 2927 range.remove(messageSequence); 2928 if (!range.isEmpty()) { 2929 sd.ackPositions.put(tx, subscriptionKey, range); 2930 } else { 2931 sd.ackPositions.remove(tx, subscriptionKey); 2932 } 2933 2934 MessageKeys key = sd.orderIndex.get(tx, messageSequence); 2935 decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey, 2936 key.location.getSize()); 2937 2938 // Check if the message is reference by any other subscription. 2939 Long count = sd.messageReferences.get(messageSequence); 2940 if (count != null) { 2941 long references = count.longValue() - 1; 2942 if (references > 0) { 2943 sd.messageReferences.put(messageSequence, Long.valueOf(references)); 2944 return; 2945 } else { 2946 sd.messageReferences.remove(messageSequence); 2947 } 2948 } 2949 2950 // Find all the entries that need to get deleted. 2951 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>(); 2952 sd.orderIndex.getDeleteList(tx, deletes, messageSequence); 2953 2954 // Do the actual deletes. 2955 for (Entry<Long, MessageKeys> entry : deletes) { 2956 sd.locationIndex.remove(tx, entry.getValue().location); 2957 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2958 sd.orderIndex.remove(tx, entry.getKey()); 2959 decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize()); 2960 } 2961 } 2962 } 2963 } 2964 2965 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2966 return sd.subscriptionAcks.get(tx, subscriptionKey); 2967 } 2968 2969 protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2970 if (sd.ackPositions != null) { 2971 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2972 if (messageSequences != null) { 2973 long result = messageSequences.rangeSize(); 2974 // if there's anything in the range the last value is always the nextMessage marker, so remove 1. 2975 return result > 0 ? result - 1 : 0; 2976 } 2977 } 2978 2979 return 0; 2980 } 2981 2982 protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2983 long locationSize = 0; 2984 2985 if (sd.ackPositions != null) { 2986 //grab the messages attached to this subscription 2987 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2988 2989 if (messageSequences != null) { 2990 Sequence head = messageSequences.getHead(); 2991 if (head != null) { 2992 //get an iterator over the order index starting at the first unacked message 2993 //and go over each message to add up the size 2994 Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, 2995 new MessageOrderCursor(head.getFirst())); 2996 2997 while (iterator.hasNext()) { 2998 Entry<Long, MessageKeys> entry = iterator.next(); 2999 locationSize += entry.getValue().location.getSize(); 3000 } 3001 } 3002 } 3003 } 3004 3005 return locationSize; 3006 } 3007 3008 protected String key(KahaDestination destination) { 3009 return destination.getType().getNumber() + ":" + destination.getName(); 3010 } 3011 3012 // ///////////////////////////////////////////////////////////////// 3013 // Transaction related implementation methods. 3014 // ///////////////////////////////////////////////////////////////// 3015 @SuppressWarnings("rawtypes") 3016 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<>(); 3017 @SuppressWarnings("rawtypes") 3018 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<>(); 3019 protected final Set<String> ackedAndPrepared = new HashSet<>(); 3020 protected final Set<String> rolledBackAcks = new HashSet<>(); 3021 3022 // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, 3023 // till then they are skipped by the store. 3024 // 'at most once' XA guarantee 3025 public void trackRecoveredAcks(ArrayList<MessageAck> acks) { 3026 this.indexLock.writeLock().lock(); 3027 try { 3028 for (MessageAck ack : acks) { 3029 ackedAndPrepared.add(ack.getLastMessageId().toProducerKey()); 3030 } 3031 } finally { 3032 this.indexLock.writeLock().unlock(); 3033 } 3034 } 3035 3036 public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException { 3037 if (acks != null) { 3038 this.indexLock.writeLock().lock(); 3039 try { 3040 for (MessageAck ack : acks) { 3041 final String id = ack.getLastMessageId().toProducerKey(); 3042 ackedAndPrepared.remove(id); 3043 if (rollback) { 3044 rolledBackAcks.add(id); 3045 } 3046 } 3047 } finally { 3048 this.indexLock.writeLock().unlock(); 3049 } 3050 } 3051 } 3052 3053 @SuppressWarnings("rawtypes") 3054 private List<Operation> getInflightTx(KahaTransactionInfo info) { 3055 TransactionId key = TransactionIdConversion.convert(info); 3056 List<Operation> tx; 3057 synchronized (inflightTransactions) { 3058 tx = inflightTransactions.get(key); 3059 if (tx == null) { 3060 tx = Collections.synchronizedList(new ArrayList<Operation>()); 3061 inflightTransactions.put(key, tx); 3062 } 3063 } 3064 return tx; 3065 } 3066 3067 @SuppressWarnings("unused") 3068 private TransactionId key(KahaTransactionInfo transactionInfo) { 3069 return TransactionIdConversion.convert(transactionInfo); 3070 } 3071 3072 abstract class Operation <T extends JournalCommand<T>> { 3073 final T command; 3074 final Location location; 3075 3076 public Operation(T command, Location location) { 3077 this.command = command; 3078 this.location = location; 3079 } 3080 3081 public Location getLocation() { 3082 return location; 3083 } 3084 3085 public T getCommand() { 3086 return command; 3087 } 3088 3089 abstract public void execute(Transaction tx) throws IOException; 3090 } 3091 3092 class AddOperation extends Operation<KahaAddMessageCommand> { 3093 final IndexAware runWithIndexLock; 3094 public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) { 3095 super(command, location); 3096 this.runWithIndexLock = runWithIndexLock; 3097 } 3098 3099 @Override 3100 public void execute(Transaction tx) throws IOException { 3101 long seq = updateIndex(tx, command, location); 3102 if (runWithIndexLock != null) { 3103 runWithIndexLock.sequenceAssignedWithIndexLocked(seq); 3104 } 3105 } 3106 } 3107 3108 class RemoveOperation extends Operation<KahaRemoveMessageCommand> { 3109 3110 public RemoveOperation(KahaRemoveMessageCommand command, Location location) { 3111 super(command, location); 3112 } 3113 3114 @Override 3115 public void execute(Transaction tx) throws IOException { 3116 updateIndex(tx, command, location); 3117 } 3118 } 3119 3120 // ///////////////////////////////////////////////////////////////// 3121 // Initialization related implementation methods. 3122 // ///////////////////////////////////////////////////////////////// 3123 3124 private PageFile createPageFile() throws IOException { 3125 if (indexDirectory == null) { 3126 indexDirectory = directory; 3127 } 3128 IOHelper.mkdirs(indexDirectory); 3129 PageFile index = new PageFile(indexDirectory, "db"); 3130 index.setEnableWriteThread(isEnableIndexWriteAsync()); 3131 index.setWriteBatchSize(getIndexWriteBatchSize()); 3132 index.setPageCacheSize(indexCacheSize); 3133 index.setUseLFRUEviction(isUseIndexLFRUEviction()); 3134 index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); 3135 index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); 3136 index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); 3137 index.setEnablePageCaching(isEnableIndexPageCaching()); 3138 return index; 3139 } 3140 3141 protected Journal createJournal() throws IOException { 3142 Journal manager = new Journal(); 3143 manager.setDirectory(directory); 3144 manager.setMaxFileLength(getJournalMaxFileLength()); 3145 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); 3146 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); 3147 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 3148 manager.setArchiveDataLogs(isArchiveDataLogs()); 3149 manager.setSizeAccumulator(journalSize); 3150 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); 3151 manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase())); 3152 manager.setPreallocationStrategy( 3153 Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase())); 3154 manager.setJournalDiskSyncStrategy(journalDiskSyncStrategy); 3155 if (getDirectoryArchive() != null) { 3156 IOHelper.mkdirs(getDirectoryArchive()); 3157 manager.setDirectoryArchive(getDirectoryArchive()); 3158 } 3159 return manager; 3160 } 3161 3162 private Metadata createMetadata() { 3163 Metadata md = new Metadata(); 3164 md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth()); 3165 md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack()); 3166 return md; 3167 } 3168 3169 protected abstract void configureMetadata(); 3170 3171 public int getJournalMaxWriteBatchSize() { 3172 return journalMaxWriteBatchSize; 3173 } 3174 3175 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 3176 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 3177 } 3178 3179 public File getDirectory() { 3180 return directory; 3181 } 3182 3183 public void setDirectory(File directory) { 3184 this.directory = directory; 3185 } 3186 3187 public boolean isDeleteAllMessages() { 3188 return deleteAllMessages; 3189 } 3190 3191 public void setDeleteAllMessages(boolean deleteAllMessages) { 3192 this.deleteAllMessages = deleteAllMessages; 3193 } 3194 3195 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 3196 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 3197 } 3198 3199 public int getIndexWriteBatchSize() { 3200 return setIndexWriteBatchSize; 3201 } 3202 3203 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 3204 this.enableIndexWriteAsync = enableIndexWriteAsync; 3205 } 3206 3207 boolean isEnableIndexWriteAsync() { 3208 return enableIndexWriteAsync; 3209 } 3210 3211 /** 3212 * @deprecated use {@link #getJournalDiskSyncStrategyEnum} or {@link #getJournalDiskSyncStrategy} instead 3213 * @return 3214 */ 3215 @Deprecated 3216 public boolean isEnableJournalDiskSyncs() { 3217 return journalDiskSyncStrategy == JournalDiskSyncStrategy.ALWAYS; 3218 } 3219 3220 /** 3221 * @deprecated use {@link #setEnableJournalDiskSyncs} instead 3222 * @param syncWrites 3223 */ 3224 @Deprecated 3225 public void setEnableJournalDiskSyncs(boolean syncWrites) { 3226 if (syncWrites) { 3227 journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 3228 } else { 3229 journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER; 3230 } 3231 } 3232 3233 public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() { 3234 return journalDiskSyncStrategy; 3235 } 3236 3237 public String getJournalDiskSyncStrategy() { 3238 return journalDiskSyncStrategy.name(); 3239 } 3240 3241 public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) { 3242 this.journalDiskSyncStrategy = JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase()); 3243 } 3244 3245 public long getJournalDiskSyncInterval() { 3246 return journalDiskSyncInterval; 3247 } 3248 3249 public void setJournalDiskSyncInterval(long journalDiskSyncInterval) { 3250 this.journalDiskSyncInterval = journalDiskSyncInterval; 3251 } 3252 3253 public long getCheckpointInterval() { 3254 return checkpointInterval; 3255 } 3256 3257 public void setCheckpointInterval(long checkpointInterval) { 3258 this.checkpointInterval = checkpointInterval; 3259 } 3260 3261 public long getCleanupInterval() { 3262 return cleanupInterval; 3263 } 3264 3265 public void setCleanupInterval(long cleanupInterval) { 3266 this.cleanupInterval = cleanupInterval; 3267 } 3268 3269 public void setJournalMaxFileLength(int journalMaxFileLength) { 3270 this.journalMaxFileLength = journalMaxFileLength; 3271 } 3272 3273 public int getJournalMaxFileLength() { 3274 return journalMaxFileLength; 3275 } 3276 3277 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 3278 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); 3279 } 3280 3281 public int getMaxFailoverProducersToTrack() { 3282 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); 3283 } 3284 3285 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 3286 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); 3287 } 3288 3289 public int getFailoverProducersAuditDepth() { 3290 return this.metadata.producerSequenceIdTracker.getAuditDepth(); 3291 } 3292 3293 public PageFile getPageFile() throws IOException { 3294 if (pageFile == null) { 3295 pageFile = createPageFile(); 3296 } 3297 return pageFile; 3298 } 3299 3300 public Journal getJournal() throws IOException { 3301 if (journal == null) { 3302 journal = createJournal(); 3303 } 3304 return journal; 3305 } 3306 3307 protected Metadata getMetadata() { 3308 return metadata; 3309 } 3310 3311 public boolean isFailIfDatabaseIsLocked() { 3312 return failIfDatabaseIsLocked; 3313 } 3314 3315 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 3316 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 3317 } 3318 3319 public boolean isIgnoreMissingJournalfiles() { 3320 return ignoreMissingJournalfiles; 3321 } 3322 3323 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 3324 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 3325 } 3326 3327 public int getIndexCacheSize() { 3328 return indexCacheSize; 3329 } 3330 3331 public void setIndexCacheSize(int indexCacheSize) { 3332 this.indexCacheSize = indexCacheSize; 3333 } 3334 3335 public boolean isCheckForCorruptJournalFiles() { 3336 return checkForCorruptJournalFiles; 3337 } 3338 3339 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 3340 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 3341 } 3342 3343 public boolean isChecksumJournalFiles() { 3344 return checksumJournalFiles; 3345 } 3346 3347 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 3348 this.checksumJournalFiles = checksumJournalFiles; 3349 } 3350 3351 @Override 3352 public void setBrokerService(BrokerService brokerService) { 3353 this.brokerService = brokerService; 3354 } 3355 3356 /** 3357 * @return the archiveDataLogs 3358 */ 3359 public boolean isArchiveDataLogs() { 3360 return this.archiveDataLogs; 3361 } 3362 3363 /** 3364 * @param archiveDataLogs the archiveDataLogs to set 3365 */ 3366 public void setArchiveDataLogs(boolean archiveDataLogs) { 3367 this.archiveDataLogs = archiveDataLogs; 3368 } 3369 3370 /** 3371 * @return the directoryArchive 3372 */ 3373 public File getDirectoryArchive() { 3374 return this.directoryArchive; 3375 } 3376 3377 /** 3378 * @param directoryArchive the directoryArchive to set 3379 */ 3380 public void setDirectoryArchive(File directoryArchive) { 3381 this.directoryArchive = directoryArchive; 3382 } 3383 3384 public boolean isArchiveCorruptedIndex() { 3385 return archiveCorruptedIndex; 3386 } 3387 3388 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 3389 this.archiveCorruptedIndex = archiveCorruptedIndex; 3390 } 3391 3392 public float getIndexLFUEvictionFactor() { 3393 return indexLFUEvictionFactor; 3394 } 3395 3396 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 3397 this.indexLFUEvictionFactor = indexLFUEvictionFactor; 3398 } 3399 3400 public boolean isUseIndexLFRUEviction() { 3401 return useIndexLFRUEviction; 3402 } 3403 3404 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 3405 this.useIndexLFRUEviction = useIndexLFRUEviction; 3406 } 3407 3408 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { 3409 this.enableIndexDiskSyncs = enableIndexDiskSyncs; 3410 } 3411 3412 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { 3413 this.enableIndexRecoveryFile = enableIndexRecoveryFile; 3414 } 3415 3416 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { 3417 this.enableIndexPageCaching = enableIndexPageCaching; 3418 } 3419 3420 public boolean isEnableIndexDiskSyncs() { 3421 return enableIndexDiskSyncs; 3422 } 3423 3424 public boolean isEnableIndexRecoveryFile() { 3425 return enableIndexRecoveryFile; 3426 } 3427 3428 public boolean isEnableIndexPageCaching() { 3429 return enableIndexPageCaching; 3430 } 3431 3432 // ///////////////////////////////////////////////////////////////// 3433 // Internal conversion methods. 3434 // ///////////////////////////////////////////////////////////////// 3435 3436 class MessageOrderCursor{ 3437 long defaultCursorPosition; 3438 long lowPriorityCursorPosition; 3439 long highPriorityCursorPosition; 3440 MessageOrderCursor(){ 3441 } 3442 3443 MessageOrderCursor(long position){ 3444 this.defaultCursorPosition=position; 3445 this.lowPriorityCursorPosition=position; 3446 this.highPriorityCursorPosition=position; 3447 } 3448 3449 MessageOrderCursor(MessageOrderCursor other){ 3450 this.defaultCursorPosition=other.defaultCursorPosition; 3451 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3452 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3453 } 3454 3455 MessageOrderCursor copy() { 3456 return new MessageOrderCursor(this); 3457 } 3458 3459 void reset() { 3460 this.defaultCursorPosition=0; 3461 this.highPriorityCursorPosition=0; 3462 this.lowPriorityCursorPosition=0; 3463 } 3464 3465 void increment() { 3466 if (defaultCursorPosition!=0) { 3467 defaultCursorPosition++; 3468 } 3469 if (highPriorityCursorPosition!=0) { 3470 highPriorityCursorPosition++; 3471 } 3472 if (lowPriorityCursorPosition!=0) { 3473 lowPriorityCursorPosition++; 3474 } 3475 } 3476 3477 @Override 3478 public String toString() { 3479 return "MessageOrderCursor:[def:" + defaultCursorPosition 3480 + ", low:" + lowPriorityCursorPosition 3481 + ", high:" + highPriorityCursorPosition + "]"; 3482 } 3483 3484 public void sync(MessageOrderCursor other) { 3485 this.defaultCursorPosition=other.defaultCursorPosition; 3486 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3487 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3488 } 3489 } 3490 3491 class MessageOrderIndex { 3492 static final byte HI = 9; 3493 static final byte LO = 0; 3494 static final byte DEF = 4; 3495 3496 long nextMessageId; 3497 BTreeIndex<Long, MessageKeys> defaultPriorityIndex; 3498 BTreeIndex<Long, MessageKeys> lowPriorityIndex; 3499 BTreeIndex<Long, MessageKeys> highPriorityIndex; 3500 final MessageOrderCursor cursor = new MessageOrderCursor(); 3501 Long lastDefaultKey; 3502 Long lastHighKey; 3503 Long lastLowKey; 3504 byte lastGetPriority; 3505 final List<Long> pendingAdditions = new LinkedList<>(); 3506 final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); 3507 3508 MessageKeys remove(Transaction tx, Long key) throws IOException { 3509 MessageKeys result = defaultPriorityIndex.remove(tx, key); 3510 if (result == null && highPriorityIndex!=null) { 3511 result = highPriorityIndex.remove(tx, key); 3512 if (result ==null && lowPriorityIndex!=null) { 3513 result = lowPriorityIndex.remove(tx, key); 3514 } 3515 } 3516 return result; 3517 } 3518 3519 void load(Transaction tx) throws IOException { 3520 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3521 defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller); 3522 defaultPriorityIndex.load(tx); 3523 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3524 lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); 3525 lowPriorityIndex.load(tx); 3526 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3527 highPriorityIndex.setValueMarshaller(messageKeysMarshaller); 3528 highPriorityIndex.load(tx); 3529 } 3530 3531 void allocate(Transaction tx) throws IOException { 3532 defaultPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); 3533 if (metadata.version >= 2) { 3534 lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); 3535 highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); 3536 } 3537 } 3538 3539 void configureLast(Transaction tx) throws IOException { 3540 // Figure out the next key using the last entry in the destination. 3541 TreeSet<Long> orderedSet = new TreeSet<>(); 3542 3543 addLast(orderedSet, highPriorityIndex, tx); 3544 addLast(orderedSet, defaultPriorityIndex, tx); 3545 addLast(orderedSet, lowPriorityIndex, tx); 3546 3547 if (!orderedSet.isEmpty()) { 3548 nextMessageId = orderedSet.last() + 1; 3549 } 3550 } 3551 3552 private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException { 3553 if (index != null) { 3554 Entry<Long, MessageKeys> lastEntry = index.getLast(tx); 3555 if (lastEntry != null) { 3556 orderedSet.add(lastEntry.getKey()); 3557 } 3558 } 3559 } 3560 3561 void clear(Transaction tx) throws IOException { 3562 this.remove(tx); 3563 this.resetCursorPosition(); 3564 this.allocate(tx); 3565 this.load(tx); 3566 this.configureLast(tx); 3567 } 3568 3569 void remove(Transaction tx) throws IOException { 3570 defaultPriorityIndex.clear(tx); 3571 defaultPriorityIndex.unload(tx); 3572 tx.free(defaultPriorityIndex.getPageId()); 3573 if (lowPriorityIndex != null) { 3574 lowPriorityIndex.clear(tx); 3575 lowPriorityIndex.unload(tx); 3576 3577 tx.free(lowPriorityIndex.getPageId()); 3578 } 3579 if (highPriorityIndex != null) { 3580 highPriorityIndex.clear(tx); 3581 highPriorityIndex.unload(tx); 3582 tx.free(highPriorityIndex.getPageId()); 3583 } 3584 } 3585 3586 void resetCursorPosition() { 3587 this.cursor.reset(); 3588 lastDefaultKey = null; 3589 lastHighKey = null; 3590 lastLowKey = null; 3591 } 3592 3593 void setBatch(Transaction tx, Long sequence) throws IOException { 3594 if (sequence != null) { 3595 Long nextPosition = new Long(sequence.longValue() + 1); 3596 lastDefaultKey = sequence; 3597 cursor.defaultCursorPosition = nextPosition.longValue(); 3598 lastHighKey = sequence; 3599 cursor.highPriorityCursorPosition = nextPosition.longValue(); 3600 lastLowKey = sequence; 3601 cursor.lowPriorityCursorPosition = nextPosition.longValue(); 3602 } 3603 } 3604 3605 void setBatch(Transaction tx, LastAck last) throws IOException { 3606 setBatch(tx, last.lastAckedSequence); 3607 if (cursor.defaultCursorPosition == 0 3608 && cursor.highPriorityCursorPosition == 0 3609 && cursor.lowPriorityCursorPosition == 0) { 3610 long next = last.lastAckedSequence + 1; 3611 switch (last.priority) { 3612 case DEF: 3613 cursor.defaultCursorPosition = next; 3614 cursor.highPriorityCursorPosition = next; 3615 break; 3616 case HI: 3617 cursor.highPriorityCursorPosition = next; 3618 break; 3619 case LO: 3620 cursor.lowPriorityCursorPosition = next; 3621 cursor.defaultCursorPosition = next; 3622 cursor.highPriorityCursorPosition = next; 3623 break; 3624 } 3625 } 3626 } 3627 3628 void stoppedIterating() { 3629 if (lastDefaultKey!=null) { 3630 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1; 3631 } 3632 if (lastHighKey!=null) { 3633 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1; 3634 } 3635 if (lastLowKey!=null) { 3636 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1; 3637 } 3638 lastDefaultKey = null; 3639 lastHighKey = null; 3640 lastLowKey = null; 3641 } 3642 3643 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId) 3644 throws IOException { 3645 if (defaultPriorityIndex.containsKey(tx, sequenceId)) { 3646 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); 3647 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) { 3648 getDeleteList(tx, deletes, highPriorityIndex, sequenceId); 3649 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) { 3650 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId); 3651 } 3652 } 3653 3654 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, 3655 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException { 3656 3657 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null); 3658 deletes.add(iterator.next()); 3659 } 3660 3661 long getNextMessageId() { 3662 return nextMessageId++; 3663 } 3664 3665 void revertNextMessageId() { 3666 nextMessageId--; 3667 } 3668 3669 MessageKeys get(Transaction tx, Long key) throws IOException { 3670 MessageKeys result = defaultPriorityIndex.get(tx, key); 3671 if (result == null) { 3672 result = highPriorityIndex.get(tx, key); 3673 if (result == null) { 3674 result = lowPriorityIndex.get(tx, key); 3675 lastGetPriority = LO; 3676 } else { 3677 lastGetPriority = HI; 3678 } 3679 } else { 3680 lastGetPriority = DEF; 3681 } 3682 return result; 3683 } 3684 3685 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException { 3686 if (priority == javax.jms.Message.DEFAULT_PRIORITY) { 3687 return defaultPriorityIndex.put(tx, key, value); 3688 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) { 3689 return highPriorityIndex.put(tx, key, value); 3690 } else { 3691 return lowPriorityIndex.put(tx, key, value); 3692 } 3693 } 3694 3695 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{ 3696 return new MessageOrderIterator(tx,cursor,this); 3697 } 3698 3699 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ 3700 return new MessageOrderIterator(tx,m,this); 3701 } 3702 3703 public byte lastGetPriority() { 3704 return lastGetPriority; 3705 } 3706 3707 public boolean alreadyDispatched(Long sequence) { 3708 return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) || 3709 (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) || 3710 (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence); 3711 } 3712 3713 public void trackPendingAdd(Long seq) { 3714 synchronized (pendingAdditions) { 3715 pendingAdditions.add(seq); 3716 } 3717 } 3718 3719 public void trackPendingAddComplete(Long seq) { 3720 synchronized (pendingAdditions) { 3721 pendingAdditions.remove(seq); 3722 } 3723 } 3724 3725 public Long minPendingAdd() { 3726 synchronized (pendingAdditions) { 3727 if (!pendingAdditions.isEmpty()) { 3728 return pendingAdditions.get(0); 3729 } else { 3730 return null; 3731 } 3732 } 3733 } 3734 3735 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ 3736 Iterator<Entry<Long, MessageKeys>>currentIterator; 3737 final Iterator<Entry<Long, MessageKeys>>highIterator; 3738 final Iterator<Entry<Long, MessageKeys>>defaultIterator; 3739 final Iterator<Entry<Long, MessageKeys>>lowIterator; 3740 3741 MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException { 3742 Long pendingAddLimiter = messageOrderIndex.minPendingAdd(); 3743 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter); 3744 if (highPriorityIndex != null) { 3745 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter); 3746 } else { 3747 this.highIterator = null; 3748 } 3749 if (lowPriorityIndex != null) { 3750 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter); 3751 } else { 3752 this.lowIterator = null; 3753 } 3754 } 3755 3756 @Override 3757 public boolean hasNext() { 3758 if (currentIterator == null) { 3759 if (highIterator != null) { 3760 if (highIterator.hasNext()) { 3761 currentIterator = highIterator; 3762 return currentIterator.hasNext(); 3763 } 3764 if (defaultIterator.hasNext()) { 3765 currentIterator = defaultIterator; 3766 return currentIterator.hasNext(); 3767 } 3768 if (lowIterator.hasNext()) { 3769 currentIterator = lowIterator; 3770 return currentIterator.hasNext(); 3771 } 3772 return false; 3773 } else { 3774 currentIterator = defaultIterator; 3775 return currentIterator.hasNext(); 3776 } 3777 } 3778 if (highIterator != null) { 3779 if (currentIterator.hasNext()) { 3780 return true; 3781 } 3782 if (currentIterator == highIterator) { 3783 if (defaultIterator.hasNext()) { 3784 currentIterator = defaultIterator; 3785 return currentIterator.hasNext(); 3786 } 3787 if (lowIterator.hasNext()) { 3788 currentIterator = lowIterator; 3789 return currentIterator.hasNext(); 3790 } 3791 return false; 3792 } 3793 3794 if (currentIterator == defaultIterator) { 3795 if (lowIterator.hasNext()) { 3796 currentIterator = lowIterator; 3797 return currentIterator.hasNext(); 3798 } 3799 return false; 3800 } 3801 } 3802 return currentIterator.hasNext(); 3803 } 3804 3805 @Override 3806 public Entry<Long, MessageKeys> next() { 3807 Entry<Long, MessageKeys> result = currentIterator.next(); 3808 if (result != null) { 3809 Long key = result.getKey(); 3810 if (highIterator != null) { 3811 if (currentIterator == defaultIterator) { 3812 lastDefaultKey = key; 3813 } else if (currentIterator == highIterator) { 3814 lastHighKey = key; 3815 } else { 3816 lastLowKey = key; 3817 } 3818 } else { 3819 lastDefaultKey = key; 3820 } 3821 } 3822 return result; 3823 } 3824 3825 @Override 3826 public void remove() { 3827 throw new UnsupportedOperationException(); 3828 } 3829 } 3830 } 3831 3832 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> { 3833 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); 3834 3835 @Override 3836 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException { 3837 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 3838 ObjectOutputStream oout = new ObjectOutputStream(baos); 3839 oout.writeObject(object); 3840 oout.flush(); 3841 oout.close(); 3842 byte[] data = baos.toByteArray(); 3843 dataOut.writeInt(data.length); 3844 dataOut.write(data); 3845 } 3846 3847 @Override 3848 @SuppressWarnings("unchecked") 3849 public HashSet<String> readPayload(DataInput dataIn) throws IOException { 3850 int dataLen = dataIn.readInt(); 3851 byte[] data = new byte[dataLen]; 3852 dataIn.readFully(data); 3853 ByteArrayInputStream bais = new ByteArrayInputStream(data); 3854 ObjectInputStream oin = new ObjectInputStream(bais); 3855 try { 3856 return (HashSet<String>) oin.readObject(); 3857 } catch (ClassNotFoundException cfe) { 3858 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe); 3859 ioe.initCause(cfe); 3860 throw ioe; 3861 } 3862 } 3863 } 3864 3865 public File getIndexDirectory() { 3866 return indexDirectory; 3867 } 3868 3869 public void setIndexDirectory(File indexDirectory) { 3870 this.indexDirectory = indexDirectory; 3871 } 3872 3873 interface IndexAware { 3874 public void sequenceAssignedWithIndexLocked(long index); 3875 } 3876 3877 public String getPreallocationScope() { 3878 return preallocationScope; 3879 } 3880 3881 public void setPreallocationScope(String preallocationScope) { 3882 this.preallocationScope = preallocationScope; 3883 } 3884 3885 public String getPreallocationStrategy() { 3886 return preallocationStrategy; 3887 } 3888 3889 public void setPreallocationStrategy(String preallocationStrategy) { 3890 this.preallocationStrategy = preallocationStrategy; 3891 } 3892 3893 public int getCompactAcksAfterNoGC() { 3894 return compactAcksAfterNoGC; 3895 } 3896 3897 /** 3898 * Sets the number of GC cycles where no journal logs were removed before an attempt to 3899 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 3900 * <p> 3901 * A value of -1 will disable this feature. 3902 * 3903 * @param compactAcksAfterNoGC 3904 * Number of empty GC cycles before we rewrite old ACKS. 3905 */ 3906 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 3907 this.compactAcksAfterNoGC = compactAcksAfterNoGC; 3908 } 3909 3910 /** 3911 * Returns whether Ack compaction will ignore that the store is still growing 3912 * and run more often. 3913 * 3914 * @return the compactAcksIgnoresStoreGrowth current value. 3915 */ 3916 public boolean isCompactAcksIgnoresStoreGrowth() { 3917 return compactAcksIgnoresStoreGrowth; 3918 } 3919 3920 /** 3921 * Configure if Ack compaction will occur regardless of continued growth of the 3922 * journal logs meaning that the store has not run out of space yet. Because the 3923 * compaction operation can be costly this value is defaulted to off and the Ack 3924 * compaction is only done when it seems that the store cannot grow and larger. 3925 * 3926 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 3927 */ 3928 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 3929 this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth; 3930 } 3931 3932 /** 3933 * Returns whether Ack compaction is enabled 3934 * 3935 * @return enableAckCompaction 3936 */ 3937 public boolean isEnableAckCompaction() { 3938 return enableAckCompaction; 3939 } 3940 3941 /** 3942 * Configure if the Ack compaction task should be enabled to run 3943 * 3944 * @param enableAckCompaction 3945 */ 3946 public void setEnableAckCompaction(boolean enableAckCompaction) { 3947 this.enableAckCompaction = enableAckCompaction; 3948 } 3949 3950 /** 3951 * @return 3952 */ 3953 public boolean isEnableSubscriptionStatistics() { 3954 return enableSubscriptionStatistics; 3955 } 3956 3957 /** 3958 * Enable caching statistics for each subscription to allow non-blocking 3959 * retrieval of metrics. This could incur some overhead to compute if there are a lot 3960 * of subscriptions. 3961 * 3962 * @param enableSubscriptionStatistics 3963 */ 3964 public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) { 3965 this.enableSubscriptionStatistics = enableSubscriptionStatistics; 3966 } 3967}