001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.CancellationException;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.Future;
030
031import org.apache.activemq.broker.ConnectionContext;
032import org.apache.activemq.command.Message;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.command.MessageId;
035import org.apache.activemq.command.TransactionId;
036import org.apache.activemq.command.XATransactionId;
037import org.apache.activemq.protobuf.Buffer;
038import org.apache.activemq.store.AbstractMessageStore;
039import org.apache.activemq.store.ListenableFuture;
040import org.apache.activemq.store.MessageStore;
041import org.apache.activemq.store.ProxyMessageStore;
042import org.apache.activemq.store.ProxyTopicMessageStore;
043import org.apache.activemq.store.TopicMessageStore;
044import org.apache.activemq.store.TransactionRecoveryListener;
045import org.apache.activemq.store.TransactionStore;
046import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
047import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
048import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
049import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
050import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
051import org.apache.activemq.wireformat.WireFormat;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Provides a TransactionStore implementation that can create transaction aware
057 * MessageStore objects from non transaction aware MessageStore objects.
058 *
059 *
060 */
061public class KahaDBTransactionStore implements TransactionStore {
062    static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
063    ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
064    private final KahaDBStore theStore;
065
066    public KahaDBTransactionStore(KahaDBStore theStore) {
067        this.theStore = theStore;
068    }
069
070    private WireFormat wireFormat(){
071      return this.theStore.wireFormat;
072    }
073
074    public class Tx {
075        private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
076
077        private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
078
079        public void add(AddMessageCommand msg) {
080            messages.add(msg);
081        }
082
083        public void add(RemoveMessageCommand ack) {
084            acks.add(ack);
085        }
086
087        public Message[] getMessages() {
088            Message rc[] = new Message[messages.size()];
089            int count = 0;
090            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
091                AddMessageCommand cmd = iter.next();
092                rc[count++] = cmd.getMessage();
093            }
094            return rc;
095        }
096
097        public MessageAck[] getAcks() {
098            MessageAck rc[] = new MessageAck[acks.size()];
099            int count = 0;
100            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
101                RemoveMessageCommand cmd = iter.next();
102                rc[count++] = cmd.getMessageAck();
103            }
104            return rc;
105        }
106
107        /**
108         * @return true if something to commit
109         * @throws IOException
110         */
111        public List<Future<Object>> commit() throws IOException {
112            List<Future<Object>> results = new ArrayList<Future<Object>>();
113            // Do all the message adds.
114            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
115                AddMessageCommand cmd = iter.next();
116                results.add(cmd.run());
117
118            }
119            // And removes..
120            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
121                RemoveMessageCommand cmd = iter.next();
122                cmd.run();
123                results.add(cmd.run());
124            }
125
126            return results;
127        }
128    }
129
130    public abstract class AddMessageCommand {
131        private final ConnectionContext ctx;
132        AddMessageCommand(ConnectionContext ctx) {
133            this.ctx = ctx;
134        }
135        abstract Message getMessage();
136        Future<Object> run() throws IOException {
137            return run(this.ctx);
138        }
139        abstract Future<Object> run(ConnectionContext ctx) throws IOException;
140    }
141
142    public abstract class RemoveMessageCommand {
143
144        private final ConnectionContext ctx;
145        RemoveMessageCommand(ConnectionContext ctx) {
146            this.ctx = ctx;
147        }
148        abstract MessageAck getMessageAck();
149        Future<Object> run() throws IOException {
150            return run(this.ctx);
151        }
152        abstract Future<Object> run(ConnectionContext context) throws IOException;
153    }
154
155    public MessageStore proxy(MessageStore messageStore) {
156        return new ProxyMessageStore(messageStore) {
157            @Override
158            public void addMessage(ConnectionContext context, final Message send) throws IOException {
159                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
160            }
161
162            @Override
163            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
164                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
165            }
166
167            @Override
168            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
169                return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
170            }
171
172            @Override
173            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
174                return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
175            }
176
177            @Override
178            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
179                KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
180            }
181
182            @Override
183            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
184                KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
185            }
186        };
187    }
188
189    public TopicMessageStore proxy(TopicMessageStore messageStore) {
190        return new ProxyTopicMessageStore(messageStore) {
191            @Override
192            public void addMessage(ConnectionContext context, final Message send) throws IOException {
193                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
194            }
195
196            @Override
197            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
198                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
199            }
200
201            @Override
202            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
203                return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
204            }
205
206            @Override
207            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
208                return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
209            }
210
211            @Override
212            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
213                KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
214            }
215
216            @Override
217            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
218                KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
219            }
220
221            @Override
222            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
223                            MessageId messageId, MessageAck ack) throws IOException {
224                KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
225                        subscriptionName, messageId, ack);
226            }
227
228        };
229    }
230
231    /**
232     * @throws IOException
233     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
234     */
235    @Override
236    public void prepare(TransactionId txid) throws IOException {
237        KahaTransactionInfo info = getTransactionInfo(txid);
238        if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
239            theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
240        } else {
241            Tx tx = inflightTransactions.remove(txid);
242            if (tx != null) {
243               theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
244            }
245        }
246    }
247
248    public Tx getTx(Object txid) {
249        Tx tx = inflightTransactions.get(txid);
250        if (tx == null) {
251            tx = new Tx();
252            inflightTransactions.put(txid, tx);
253        }
254        return tx;
255    }
256
257    @Override
258    public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit)
259            throws IOException {
260        if (txid != null) {
261            if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
262                if (preCommit != null) {
263                    preCommit.run();
264                }
265                Tx tx = inflightTransactions.remove(txid);
266                if (tx != null) {
267                    List<Future<Object>> results = tx.commit();
268                    boolean doneSomething = false;
269                    for (Future<Object> result : results) {
270                        try {
271                            result.get();
272                        } catch (InterruptedException e) {
273                            theStore.brokerService.handleIOException(new IOException(e.getMessage()));
274                        } catch (ExecutionException e) {
275                            theStore.brokerService.handleIOException(new IOException(e.getMessage()));
276                        }catch(CancellationException e) {
277                        }
278                        if (!result.isCancelled()) {
279                            doneSomething = true;
280                        }
281                    }
282                    if (postCommit != null) {
283                        postCommit.run();
284                    }
285                    if (doneSomething) {
286                        KahaTransactionInfo info = getTransactionInfo(txid);
287                        theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
288                    }
289                }else {
290                    //The Tx will be null for failed over clients - lets run their post commits
291                    if (postCommit != null) {
292                        postCommit.run();
293                    }
294                }
295
296            } else {
297                KahaTransactionInfo info = getTransactionInfo(txid);
298                if (preCommit != null) {
299                    preCommit.run();
300                }
301                theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit);
302                forgetRecoveredAcks(txid, false);
303            }
304        }else {
305           LOG.error("Null transaction passed on commit");
306        }
307    }
308
309    /**
310     * @throws IOException
311     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
312     */
313    @Override
314    public void rollback(TransactionId txid) throws IOException {
315        if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
316            KahaTransactionInfo info = getTransactionInfo(txid);
317            theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
318            forgetRecoveredAcks(txid, true);
319        } else {
320            inflightTransactions.remove(txid);
321        }
322    }
323
324    protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException {
325        if (txid.isXATransaction()) {
326            XATransactionId xaTid = ((XATransactionId) txid);
327            theStore.forgetRecoveredAcks(xaTid.getPreparedAcks(), isRollback);
328        }
329    }
330
331    @Override
332    public void start() throws Exception {
333    }
334
335    @Override
336    public void stop() throws Exception {
337    }
338
339    @Override
340    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
341        for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
342            XATransactionId xid = (XATransactionId) entry.getKey();
343            ArrayList<Message> messageList = new ArrayList<Message>();
344            ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
345
346            for (Operation op : entry.getValue()) {
347                if (op.getClass() == MessageDatabase.AddOperation.class) {
348                    MessageDatabase.AddOperation addOp = (MessageDatabase.AddOperation) op;
349                    Message msg = (Message) wireFormat().unmarshal(new DataInputStream(addOp.getCommand().getMessage()
350                            .newInput()));
351                    messageList.add(msg);
352                } else {
353                    MessageDatabase.RemoveOperation rmOp = (MessageDatabase.RemoveOperation) op;
354                    Buffer ackb = rmOp.getCommand().getAck();
355                    MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput()));
356                    ackList.add(ack);
357                }
358            }
359
360            Message[] addedMessages = new Message[messageList.size()];
361            MessageAck[] acks = new MessageAck[ackList.size()];
362            messageList.toArray(addedMessages);
363            ackList.toArray(acks);
364            xid.setPreparedAcks(ackList);
365            theStore.trackRecoveredAcks(ackList);
366            listener.recover(xid, addedMessages, acks);
367        }
368    }
369
370    /**
371     * @param message
372     * @throws IOException
373     */
374    void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
375            throws IOException {
376
377        if (message.getTransactionId() != null) {
378            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
379                destination.addMessage(context, message);
380            } else {
381                Tx tx = getTx(message.getTransactionId());
382                tx.add(new AddMessageCommand(context) {
383                    @Override
384                    public Message getMessage() {
385                        return message;
386                    }
387                    @Override
388                    public Future<Object> run(ConnectionContext ctx) throws IOException {
389                        destination.addMessage(ctx, message);
390                        return AbstractMessageStore.FUTURE;
391                    }
392
393                });
394            }
395        } else {
396            destination.addMessage(context, message);
397        }
398    }
399
400    ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
401            throws IOException {
402
403        if (message.getTransactionId() != null) {
404            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
405                destination.addMessage(context, message);
406                return AbstractMessageStore.FUTURE;
407            } else {
408                Tx tx = getTx(message.getTransactionId());
409                tx.add(new AddMessageCommand(context) {
410                    @Override
411                    public Message getMessage() {
412                        return message;
413                    }
414                    @Override
415                    public Future<Object> run(ConnectionContext ctx) throws IOException {
416                        return destination.asyncAddQueueMessage(ctx, message);
417                    }
418
419                });
420                return AbstractMessageStore.FUTURE;
421            }
422        } else {
423            return destination.asyncAddQueueMessage(context, message);
424        }
425    }
426
427    ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
428            throws IOException {
429
430        if (message.getTransactionId() != null) {
431            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
432                destination.addMessage(context, message);
433                return AbstractMessageStore.FUTURE;
434            } else {
435                Tx tx = getTx(message.getTransactionId());
436                tx.add(new AddMessageCommand(context) {
437                    @Override
438                    public Message getMessage() {
439                        return message;
440                    }
441                    @Override
442                    public Future<Object> run(ConnectionContext ctx) throws IOException {
443                        return destination.asyncAddTopicMessage(ctx, message);
444                    }
445
446                });
447                return AbstractMessageStore.FUTURE;
448            }
449        } else {
450            return destination.asyncAddTopicMessage(context, message);
451        }
452    }
453
454    /**
455     * @param ack
456     * @throws IOException
457     */
458    final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
459            throws IOException {
460
461        if (ack.isInTransaction()) {
462            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
463                destination.removeMessage(context, ack);
464            } else {
465                Tx tx = getTx(ack.getTransactionId());
466                tx.add(new RemoveMessageCommand(context) {
467                    @Override
468                    public MessageAck getMessageAck() {
469                        return ack;
470                    }
471
472                    @Override
473                    public Future<Object> run(ConnectionContext ctx) throws IOException {
474                        destination.removeMessage(ctx, ack);
475                        return AbstractMessageStore.FUTURE;
476                    }
477                });
478            }
479        } else {
480            destination.removeMessage(context, ack);
481        }
482    }
483
484    final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
485            throws IOException {
486
487        if (ack.isInTransaction()) {
488            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
489                destination.removeAsyncMessage(context, ack);
490            } else {
491                Tx tx = getTx(ack.getTransactionId());
492                tx.add(new RemoveMessageCommand(context) {
493                    @Override
494                    public MessageAck getMessageAck() {
495                        return ack;
496                    }
497
498                    @Override
499                    public Future<Object> run(ConnectionContext ctx) throws IOException {
500                        destination.removeMessage(ctx, ack);
501                        return AbstractMessageStore.FUTURE;
502                    }
503                });
504            }
505        } else {
506            destination.removeAsyncMessage(context, ack);
507        }
508    }
509
510    final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
511                           final MessageId messageId, final MessageAck ack) throws IOException {
512
513        if (ack.isInTransaction()) {
514            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
515                destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
516            } else {
517                Tx tx = getTx(ack.getTransactionId());
518                tx.add(new RemoveMessageCommand(context) {
519                    @Override
520                    public MessageAck getMessageAck() {
521                        return ack;
522                    }
523
524                    @Override
525                    public Future<Object> run(ConnectionContext ctx) throws IOException {
526                        destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
527                        return AbstractMessageStore.FUTURE;
528                    }
529                });
530            }
531        } else {
532            destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
533        }
534    }
535
536
537    private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
538        return TransactionIdConversion.convert(theStore.getTransactionIdTransformer().transform(txid));
539    }
540}