001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName;
020
021import java.io.File;
022import java.io.IOException;
023import java.util.Set;
024import java.util.concurrent.Callable;
025
026import javax.management.ObjectName;
027
028import org.apache.activemq.broker.BrokerService;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.broker.LockableServiceSupport;
031import org.apache.activemq.broker.Locker;
032import org.apache.activemq.broker.jmx.AnnotatedMBean;
033import org.apache.activemq.broker.jmx.PersistenceAdapterView;
034import org.apache.activemq.broker.scheduler.JobSchedulerStore;
035import org.apache.activemq.command.ActiveMQDestination;
036import org.apache.activemq.command.ActiveMQQueue;
037import org.apache.activemq.command.ActiveMQTopic;
038import org.apache.activemq.command.LocalTransactionId;
039import org.apache.activemq.command.ProducerId;
040import org.apache.activemq.command.TransactionId;
041import org.apache.activemq.command.XATransactionId;
042import org.apache.activemq.protobuf.Buffer;
043import org.apache.activemq.store.JournaledStore;
044import org.apache.activemq.store.MessageStore;
045import org.apache.activemq.store.NoLocalSubscriptionAware;
046import org.apache.activemq.store.PersistenceAdapter;
047import org.apache.activemq.store.SharedFileLocker;
048import org.apache.activemq.store.TopicMessageStore;
049import org.apache.activemq.store.TransactionIdTransformer;
050import org.apache.activemq.store.TransactionIdTransformerAware;
051import org.apache.activemq.store.TransactionStore;
052import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
053import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
054import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
055import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
056import org.apache.activemq.usage.SystemUsage;
057import org.apache.activemq.util.ServiceStopper;
058
059/**
060 * An implementation of {@link PersistenceAdapter} designed for use with
061 * KahaDB - Embedded Lightweight Non-Relational Database
062 *
063 * @org.apache.xbean.XBean element="kahaDB"
064 *
065 */
066public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter,
067    JournaledStore, TransactionIdTransformerAware, NoLocalSubscriptionAware {
068
069    private final KahaDBStore letter = new KahaDBStore();
070
071    /**
072     * @param context
073     * @throws IOException
074     * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
075     */
076    @Override
077    public void beginTransaction(ConnectionContext context) throws IOException {
078        this.letter.beginTransaction(context);
079    }
080
081    /**
082     * @param sync
083     * @throws IOException
084     * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
085     */
086    @Override
087    public void checkpoint(boolean sync) throws IOException {
088        this.letter.checkpoint(sync);
089    }
090
091    /**
092     * @param context
093     * @throws IOException
094     * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
095     */
096    @Override
097    public void commitTransaction(ConnectionContext context) throws IOException {
098        this.letter.commitTransaction(context);
099    }
100
101    /**
102     * @param destination
103     * @return MessageStore
104     * @throws IOException
105     * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
106     */
107    @Override
108    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
109        return this.letter.createQueueMessageStore(destination);
110    }
111
112    /**
113     * @param destination
114     * @return TopicMessageStore
115     * @throws IOException
116     * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
117     */
118    @Override
119    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
120        return this.letter.createTopicMessageStore(destination);
121    }
122
123    /**
124     * @return TransactionStore
125     * @throws IOException
126     * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
127     */
128    @Override
129    public TransactionStore createTransactionStore() throws IOException {
130        return this.letter.createTransactionStore();
131    }
132
133    /**
134     * @throws IOException
135     * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
136     */
137    @Override
138    public void deleteAllMessages() throws IOException {
139        this.letter.deleteAllMessages();
140    }
141
142    /**
143     * @return destinations
144     * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
145     */
146    @Override
147    public Set<ActiveMQDestination> getDestinations() {
148        return this.letter.getDestinations();
149    }
150
151    /**
152     * @return lastMessageBrokerSequenceId
153     * @throws IOException
154     * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
155     */
156    @Override
157    public long getLastMessageBrokerSequenceId() throws IOException {
158        return this.letter.getLastMessageBrokerSequenceId();
159    }
160
161    @Override
162    public long getLastProducerSequenceId(ProducerId id) throws IOException {
163        return this.letter.getLastProducerSequenceId(id);
164    }
165
166    @Override
167    public void allowIOResumption() {
168        this.letter.allowIOResumption();
169    }
170
171    /**
172     * @param destination
173     * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
174     */
175    @Override
176    public void removeQueueMessageStore(ActiveMQQueue destination) {
177        this.letter.removeQueueMessageStore(destination);
178    }
179
180    /**
181     * @param destination
182     * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
183     */
184    @Override
185    public void removeTopicMessageStore(ActiveMQTopic destination) {
186        this.letter.removeTopicMessageStore(destination);
187    }
188
189    /**
190     * @param context
191     * @throws IOException
192     * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
193     */
194    @Override
195    public void rollbackTransaction(ConnectionContext context) throws IOException {
196        this.letter.rollbackTransaction(context);
197    }
198
199    /**
200     * @param brokerName
201     * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
202     */
203    @Override
204    public void setBrokerName(String brokerName) {
205        this.letter.setBrokerName(brokerName);
206    }
207
208    /**
209     * @param usageManager
210     * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
211     */
212    @Override
213    public void setUsageManager(SystemUsage usageManager) {
214        this.letter.setUsageManager(usageManager);
215    }
216
217    /**
218     * @return the size of the store
219     * @see org.apache.activemq.store.PersistenceAdapter#size()
220     */
221    @Override
222    public long size() {
223        return this.letter.isStarted() ? this.letter.size() : 0l;
224    }
225
226    /**
227     * @throws Exception
228     * @see org.apache.activemq.Service#start()
229     */
230    @Override
231    public void doStart() throws Exception {
232        this.letter.start();
233
234        if (brokerService != null && brokerService.isUseJmx()) {
235            PersistenceAdapterView view = new PersistenceAdapterView(this);
236            view.setInflightTransactionViewCallable(new Callable<String>() {
237                @Override
238                public String call() throws Exception {
239                    return letter.getTransactions();
240                }
241            });
242            view.setDataViewCallable(new Callable<String>() {
243                @Override
244                public String call() throws Exception {
245                    return letter.getJournal().getFileMap().keySet().toString();
246                }
247            });
248            AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view,
249                    createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString()));
250        }
251    }
252
253    /**
254     * @throws Exception
255     * @see org.apache.activemq.Service#stop()
256     */
257    @Override
258    public void doStop(ServiceStopper stopper) throws Exception {
259        this.letter.stop();
260
261        if (brokerService != null && brokerService.isUseJmx()) {
262            ObjectName brokerObjectName = brokerService.getBrokerObjectName();
263            brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString()));
264        }
265    }
266
267    /**
268     * Get the journalMaxFileLength
269     *
270     * @return the journalMaxFileLength
271     */
272    @Override
273    public int getJournalMaxFileLength() {
274        return this.letter.getJournalMaxFileLength();
275    }
276
277    /**
278     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
279     * be used
280     *
281     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
282     */
283    public void setJournalMaxFileLength(int journalMaxFileLength) {
284        this.letter.setJournalMaxFileLength(journalMaxFileLength);
285    }
286
287    /**
288     * Set the max number of producers (LRU cache) to track for duplicate sends
289     */
290    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
291        this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
292    }
293
294    public int getMaxFailoverProducersToTrack() {
295        return this.letter.getMaxFailoverProducersToTrack();
296    }
297
298    /**
299     * set the audit window depth for duplicate suppression (should exceed the max transaction
300     * batch)
301     */
302    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
303        this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
304    }
305
306    public int getFailoverProducersAuditDepth() {
307        return this.letter.getFailoverProducersAuditDepth();
308    }
309
310    /**
311     * Get the checkpointInterval
312     *
313     * @return the checkpointInterval
314     */
315    public long getCheckpointInterval() {
316        return this.letter.getCheckpointInterval();
317    }
318
319    /**
320     * Set the checkpointInterval
321     *
322     * @param checkpointInterval
323     *            the checkpointInterval to set
324     */
325    public void setCheckpointInterval(long checkpointInterval) {
326        this.letter.setCheckpointInterval(checkpointInterval);
327    }
328
329    /**
330     * Get the cleanupInterval
331     *
332     * @return the cleanupInterval
333     */
334    public long getCleanupInterval() {
335        return this.letter.getCleanupInterval();
336    }
337
338    /**
339     * Set the cleanupInterval
340     *
341     * @param cleanupInterval
342     *            the cleanupInterval to set
343     */
344    public void setCleanupInterval(long cleanupInterval) {
345        this.letter.setCleanupInterval(cleanupInterval);
346    }
347
348    /**
349     * Get the indexWriteBatchSize
350     *
351     * @return the indexWriteBatchSize
352     */
353    public int getIndexWriteBatchSize() {
354        return this.letter.getIndexWriteBatchSize();
355    }
356
357    /**
358     * Set the indexWriteBatchSize
359     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
360     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
361     * @param indexWriteBatchSize
362     *            the indexWriteBatchSize to set
363     */
364    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
365        this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
366    }
367
368    /**
369     * Get the journalMaxWriteBatchSize
370     *
371     * @return the journalMaxWriteBatchSize
372     */
373    public int getJournalMaxWriteBatchSize() {
374        return this.letter.getJournalMaxWriteBatchSize();
375    }
376
377    /**
378     * Set the journalMaxWriteBatchSize
379     *  * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
380     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
381     * @param journalMaxWriteBatchSize
382     *            the journalMaxWriteBatchSize to set
383     */
384    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
385        this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
386    }
387
388    /**
389     * Get the enableIndexWriteAsync
390     *
391     * @return the enableIndexWriteAsync
392     */
393    public boolean isEnableIndexWriteAsync() {
394        return this.letter.isEnableIndexWriteAsync();
395    }
396
397    /**
398     * Set the enableIndexWriteAsync
399     *
400     * @param enableIndexWriteAsync
401     *            the enableIndexWriteAsync to set
402     */
403    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
404        this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
405    }
406
407    /**
408     * Get the directory
409     *
410     * @return the directory
411     */
412    @Override
413    public File getDirectory() {
414        return this.letter.getDirectory();
415    }
416
417    /**
418     * @param dir
419     * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
420     */
421    @Override
422    public void setDirectory(File dir) {
423        this.letter.setDirectory(dir);
424    }
425
426    /**
427     * @return the currently configured location of the KahaDB index files.
428     */
429    public File getIndexDirectory() {
430        return this.letter.getIndexDirectory();
431    }
432
433    /**
434     * Sets the directory where KahaDB index files should be written.
435     *
436     * @param indexDirectory
437     *        the directory where the KahaDB store index files should be written.
438     */
439    public void setIndexDirectory(File indexDirectory) {
440        this.letter.setIndexDirectory(indexDirectory);
441    }
442
443    /**
444     * Get the enableJournalDiskSyncs
445     * @deprecated use {@link #getJournalDiskSyncStrategy} instead
446     * @return the enableJournalDiskSyncs
447     */
448    public boolean isEnableJournalDiskSyncs() {
449        return this.letter.isEnableJournalDiskSyncs();
450    }
451
452    /**
453     * Set the enableJournalDiskSyncs
454     *
455     * @deprecated use {@link #setJournalDiskSyncStrategy} instead
456     * @param enableJournalDiskSyncs
457     *            the enableJournalDiskSyncs to set
458     */
459    public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
460        this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
461    }
462
463    /**
464     * @return
465     */
466    public String getJournalDiskSyncStrategy() {
467        return letter.getJournalDiskSyncStrategy();
468    }
469
470    public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() {
471        return letter.getJournalDiskSyncStrategyEnum();
472    }
473
474    /**
475     * @param journalDiskSyncStrategy
476     */
477    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
478        letter.setJournalDiskSyncStrategy(journalDiskSyncStrategy);
479    }
480
481    /**
482     * @return
483     */
484    public long getJournalDiskSyncInterval() {
485        return letter.getJournalDiskSyncInterval();
486    }
487
488    /**
489     * @param journalDiskSyncInterval
490     */
491    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
492        letter.setJournalDiskSyncInterval(journalDiskSyncInterval);
493    }
494
495    /**
496     * Get the indexCacheSize
497     *
498     * @return the indexCacheSize
499     */
500    public int getIndexCacheSize() {
501        return this.letter.getIndexCacheSize();
502    }
503
504    /**
505     * Set the indexCacheSize
506     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
507     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
508     * @param indexCacheSize
509     *            the indexCacheSize to set
510     */
511    public void setIndexCacheSize(int indexCacheSize) {
512        this.letter.setIndexCacheSize(indexCacheSize);
513    }
514
515    /**
516     * Get the ignoreMissingJournalfiles
517     *
518     * @return the ignoreMissingJournalfiles
519     */
520    public boolean isIgnoreMissingJournalfiles() {
521        return this.letter.isIgnoreMissingJournalfiles();
522    }
523
524    /**
525     * Set the ignoreMissingJournalfiles
526     *
527     * @param ignoreMissingJournalfiles
528     *            the ignoreMissingJournalfiles to set
529     */
530    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
531        this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
532    }
533
534    public boolean isChecksumJournalFiles() {
535        return letter.isChecksumJournalFiles();
536    }
537
538    public boolean isCheckForCorruptJournalFiles() {
539        return letter.isCheckForCorruptJournalFiles();
540    }
541
542    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
543        letter.setChecksumJournalFiles(checksumJournalFiles);
544    }
545
546    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
547        letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
548    }
549
550    @Override
551    public void setBrokerService(BrokerService brokerService) {
552        super.setBrokerService(brokerService);
553        letter.setBrokerService(brokerService);
554    }
555
556    public String getPreallocationScope() {
557        return letter.getPreallocationScope();
558    }
559
560    public void setPreallocationScope(String preallocationScope) {
561        this.letter.setPreallocationScope(preallocationScope);
562    }
563
564    public String getPreallocationStrategy() {
565        return letter.getPreallocationStrategy();
566    }
567
568    public void setPreallocationStrategy(String preallocationStrategy) {
569        this.letter.setPreallocationStrategy(preallocationStrategy);
570    }
571
572    public boolean isArchiveDataLogs() {
573        return letter.isArchiveDataLogs();
574    }
575
576    public void setArchiveDataLogs(boolean archiveDataLogs) {
577        letter.setArchiveDataLogs(archiveDataLogs);
578    }
579
580    public File getDirectoryArchive() {
581        return letter.getDirectoryArchive();
582    }
583
584    public void setDirectoryArchive(File directoryArchive) {
585        letter.setDirectoryArchive(directoryArchive);
586    }
587
588    public boolean isConcurrentStoreAndDispatchQueues() {
589        return letter.isConcurrentStoreAndDispatchQueues();
590    }
591
592    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
593        letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
594    }
595
596    public boolean isConcurrentStoreAndDispatchTopics() {
597        return letter.isConcurrentStoreAndDispatchTopics();
598    }
599
600    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
601        letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
602    }
603
604    public int getMaxAsyncJobs() {
605        return letter.getMaxAsyncJobs();
606    }
607    /**
608     * @param maxAsyncJobs
609     *            the maxAsyncJobs to set
610     */
611    public void setMaxAsyncJobs(int maxAsyncJobs) {
612        letter.setMaxAsyncJobs(maxAsyncJobs);
613    }
614
615    /**
616     * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
617     *
618     * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
619     */
620    @Deprecated
621    public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException {
622       getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay);
623    }
624
625    public boolean getForceRecoverIndex() {
626        return letter.getForceRecoverIndex();
627    }
628
629    public void setForceRecoverIndex(boolean forceRecoverIndex) {
630        letter.setForceRecoverIndex(forceRecoverIndex);
631    }
632
633    public boolean isArchiveCorruptedIndex() {
634        return letter.isArchiveCorruptedIndex();
635    }
636
637    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
638        letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
639    }
640
641    public float getIndexLFUEvictionFactor() {
642        return letter.getIndexLFUEvictionFactor();
643    }
644
645    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
646        letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor);
647    }
648
649    public boolean isUseIndexLFRUEviction() {
650        return letter.isUseIndexLFRUEviction();
651    }
652
653    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
654        letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
655    }
656
657    public void setEnableIndexDiskSyncs(boolean diskSyncs) {
658        letter.setEnableIndexDiskSyncs(diskSyncs);
659    }
660
661    public boolean isEnableIndexDiskSyncs() {
662        return letter.isEnableIndexDiskSyncs();
663    }
664
665    public void setEnableIndexRecoveryFile(boolean enable) {
666        letter.setEnableIndexRecoveryFile(enable);
667    }
668
669    public boolean  isEnableIndexRecoveryFile() {
670        return letter.isEnableIndexRecoveryFile();
671    }
672
673    public void setEnableIndexPageCaching(boolean enable) {
674        letter.setEnableIndexPageCaching(enable);
675    }
676
677    public boolean isEnableIndexPageCaching() {
678        return letter.isEnableIndexPageCaching();
679    }
680
681    public int getCompactAcksAfterNoGC() {
682        return letter.getCompactAcksAfterNoGC();
683    }
684
685    /**
686     * Sets the number of GC cycles where no journal logs were removed before an attempt to
687     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
688     * <p>
689     * A value of -1 will disable this feature.
690     *
691     * @param compactAcksAfterNoGC
692     *      Number of empty GC cycles before we rewrite old ACKS.
693     */
694    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
695        this.letter.setCompactAcksAfterNoGC(compactAcksAfterNoGC);
696    }
697
698    public boolean isCompactAcksIgnoresStoreGrowth() {
699        return this.letter.isCompactAcksIgnoresStoreGrowth();
700    }
701
702    /**
703     * Configure if Ack compaction will occur regardless of continued growth of the
704     * journal logs meaning that the store has not run out of space yet.  Because the
705     * compaction operation can be costly this value is defaulted to off and the Ack
706     * compaction is only done when it seems that the store cannot grow and larger.
707     *
708     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
709     */
710    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
711        this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth);
712    }
713
714    /**
715     * Returns whether Ack compaction is enabled
716     *
717     * @return enableAckCompaction
718     */
719    public boolean isEnableAckCompaction() {
720        return letter.isEnableAckCompaction();
721    }
722
723    /**
724     * Configure if the Ack compaction task should be enabled to run
725     *
726     * @param enableAckCompaction
727     */
728    public void setEnableAckCompaction(boolean enableAckCompaction) {
729        letter.setEnableAckCompaction(enableAckCompaction);
730    }
731
732    /**
733     * Whether non-blocking subscription statistics have been enabled
734     *
735     * @return
736     */
737    public boolean isEnableSubscriptionStatistics() {
738        return letter.isEnableSubscriptionStatistics();
739    }
740
741    /**
742     * Enable caching statistics for each subscription to allow non-blocking
743     * retrieval of metrics.  This could incur some overhead to compute if there are a lot
744     * of subscriptions.
745     *
746     * @param enableSubscriptionStatistics
747     */
748    public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) {
749        letter.setEnableSubscriptionStatistics(enableSubscriptionStatistics);
750    }
751
752    public KahaDBStore getStore() {
753        return letter;
754    }
755
756    public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
757        if (txid == null) {
758            return null;
759        }
760        KahaTransactionInfo rc = new KahaTransactionInfo();
761
762        if (txid.isLocalTransaction()) {
763            LocalTransactionId t = (LocalTransactionId) txid;
764            KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
765            kahaTxId.setConnectionId(t.getConnectionId().getValue());
766            kahaTxId.setTransactionId(t.getValue());
767            rc.setLocalTransactionId(kahaTxId);
768        } else {
769            XATransactionId t = (XATransactionId) txid;
770            KahaXATransactionId kahaTxId = new KahaXATransactionId();
771            kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
772            kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
773            kahaTxId.setFormatId(t.getFormatId());
774            rc.setXaTransactionId(kahaTxId);
775        }
776        return rc;
777    }
778
779    @Override
780    public Locker createDefaultLocker() throws IOException {
781        SharedFileLocker locker = new SharedFileLocker();
782        locker.configure(this);
783        return locker;
784    }
785
786    @Override
787    public void init() throws Exception {}
788
789    @Override
790    public String toString() {
791        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
792        return "KahaDBPersistenceAdapter[" + path + (getIndexDirectory() != null ? ",Index:" + getIndexDirectory().getAbsolutePath() : "") +  "]";
793    }
794
795    @Override
796    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
797        getStore().setTransactionIdTransformer(transactionIdTransformer);
798    }
799
800    @Override
801    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
802        return this.letter.createJobSchedulerStore();
803    }
804
805    /* (non-Javadoc)
806     * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
807     */
808    @Override
809    public boolean isPersistNoLocal() {
810        return this.letter.isPersistNoLocal();
811    }
812}