001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.vm;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021import java.net.URI;
022import java.security.cert.X509Certificate;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028
029import org.apache.activemq.command.ShutdownInfo;
030import org.apache.activemq.thread.Task;
031import org.apache.activemq.thread.TaskRunner;
032import org.apache.activemq.thread.TaskRunnerFactory;
033import org.apache.activemq.transport.FutureResponse;
034import org.apache.activemq.transport.ResponseCallback;
035import org.apache.activemq.transport.Transport;
036import org.apache.activemq.transport.TransportDisposedIOException;
037import org.apache.activemq.transport.TransportListener;
038import org.apache.activemq.util.IOExceptionSupport;
039import org.apache.activemq.wireformat.WireFormat;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * A Transport implementation that uses direct method invocations.
045 */
046public class VMTransport implements Transport, Task {
047    protected static final Logger LOG = LoggerFactory.getLogger(VMTransport.class);
048
049    private static final AtomicLong NEXT_ID = new AtomicLong(0);
050
051    // Transport Configuration
052    protected VMTransport peer;
053    protected TransportListener transportListener;
054    protected boolean marshal;
055    protected boolean async = true;
056    protected int asyncQueueDepth = 2000;
057    protected final URI location;
058    protected final long id;
059
060    // Implementation
061    private volatile LinkedBlockingQueue<Object> messageQueue;
062    private volatile TaskRunnerFactory taskRunnerFactory;
063    private volatile TaskRunner taskRunner;
064
065    // Transport State
066    protected final AtomicBoolean started = new AtomicBoolean();
067    protected final AtomicBoolean disposed = new AtomicBoolean();
068
069    private volatile int receiveCounter;
070
071    public VMTransport(URI location) {
072        this.location = location;
073        this.id = NEXT_ID.getAndIncrement();
074    }
075
076    public void setPeer(VMTransport peer) {
077        this.peer = peer;
078    }
079
080    @Override
081    public void oneway(Object command) throws IOException {
082
083        if (disposed.get()) {
084            throw new TransportDisposedIOException("Transport disposed.");
085        }
086
087        if (peer == null) {
088            throw new IOException("Peer not connected.");
089        }
090
091        try {
092
093            if (peer.disposed.get()) {
094                throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
095            }
096
097            if (peer.async) {
098                peer.getMessageQueue().put(command);
099                peer.wakeup();
100                return;
101            }
102
103            if (!peer.started.get()) {
104                LinkedBlockingQueue<Object> pending = peer.getMessageQueue();
105                int sleepTimeMillis;
106                boolean accepted = false;
107                do {
108                    sleepTimeMillis = 0;
109                    // the pending queue is drained on start so we need to ensure we add before
110                    // the drain commences, otherwise we never get the command dispatched!
111                    synchronized (peer.started) {
112                        if (!peer.started.get()) {
113                            accepted = pending.offer(command);
114                            if (!accepted) {
115                                sleepTimeMillis = 500;
116                            }
117                        }
118                    }
119                    // give start thread a chance if we will loop
120                    TimeUnit.MILLISECONDS.sleep(sleepTimeMillis);
121
122                } while (!accepted && !peer.started.get());
123                if (accepted) {
124                    return;
125                }
126            }
127        } catch (InterruptedException e) {
128            Thread.currentThread().interrupt();
129            InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
130            iioe.initCause(e);
131            throw iioe;
132        }
133
134        dispatch(peer, peer.messageQueue, command);
135    }
136
137    public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {
138        TransportListener transportListener = transport.getTransportListener();
139        if (transportListener != null) {
140            // Lock here on the target transport's started since we want to wait for its start()
141            // method to finish dispatching out of the queue before we do our own.
142            synchronized (transport.started) {
143
144                // Ensure that no additional commands entered the queue in the small time window
145                // before the start method locks the dispatch lock and the oneway method was in
146                // an put operation.
147                while(pending != null && !pending.isEmpty() && !transport.isDisposed()) {
148                    doDispatch(transport, transportListener, pending.poll());
149                }
150
151                // We are now in sync mode and won't enqueue any more commands to the target
152                // transport so lets clean up its resources.
153                transport.messageQueue = null;
154
155                // Don't dispatch if either end was disposed already.
156                if (command != null && !this.disposed.get() && !transport.isDisposed()) {
157                    doDispatch(transport, transportListener, command);
158                }
159            }
160        }
161    }
162
163    public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
164        transport.receiveCounter++;
165        transportListener.onCommand(command);
166    }
167
168    @Override
169    public void start() throws Exception {
170
171        if (transportListener == null) {
172            throw new IOException("TransportListener not set.");
173        }
174
175        // If we are not in async mode we lock the dispatch lock here and then start to
176        // prevent any sync dispatches from occurring until we dispatch the pending messages
177        // to maintain delivery order.  When async this happens automatically so just set
178        // started and wakeup the task runner.
179        if (!async) {
180            synchronized (started) {
181                if (started.compareAndSet(false, true)) {
182                    LinkedBlockingQueue<Object> mq = getMessageQueue();
183                    Object command;
184                    while ((command = mq.poll()) != null && !disposed.get() ) {
185                        receiveCounter++;
186                        doDispatch(this, transportListener, command);
187                    }
188                }
189            }
190        } else {
191            if (started.compareAndSet(false, true)) {
192                wakeup();
193            }
194        }
195    }
196
197    @Override
198    public void stop() throws Exception {
199        // Only need to do this once, all future oneway calls will now
200        // fail as will any asnyc jobs in the task runner.
201        if (disposed.compareAndSet(false, true)) {
202
203            TaskRunner tr = taskRunner;
204            LinkedBlockingQueue<Object> mq = this.messageQueue;
205
206            taskRunner = null;
207            messageQueue = null;
208
209            if (mq != null) {
210                mq.clear();
211            }
212
213            // don't wait for completion
214            if (tr != null) {
215                try {
216                    tr.shutdown(1);
217                } catch(Exception e) {
218                }
219                tr = null;
220            }
221
222            if (peer.transportListener != null) {
223                // let the peer know that we are disconnecting after attempting
224                // to cleanly shutdown the async tasks so that this is the last
225                // command it see's.
226                try {
227                    peer.transportListener.onCommand(new ShutdownInfo());
228                } catch (Exception ignore) {
229                }
230
231                // let any requests pending a response see an exception
232                try {
233                    peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped."));
234                } catch (Exception ignore) {
235                }
236            }
237
238            // shutdown task runner factory
239            if (taskRunnerFactory != null) {
240                taskRunnerFactory.shutdownNow();
241                taskRunnerFactory = null;
242            }
243        }
244    }
245
246    protected void wakeup() {
247        if (async && started.get()) {
248            try {
249                getTaskRunner().wakeup();
250            } catch (InterruptedException e) {
251                Thread.currentThread().interrupt();
252            } catch (TransportDisposedIOException e) {
253            }
254        }
255    }
256
257    /**
258     * @see org.apache.activemq.thread.Task#iterate()
259     */
260    @Override
261    public boolean iterate() {
262
263        final TransportListener tl = transportListener;
264
265        LinkedBlockingQueue<Object> mq;
266        try {
267            mq = getMessageQueue();
268        } catch (TransportDisposedIOException e) {
269            return false;
270        }
271
272        Object command = mq.poll();
273        if (command != null && !disposed.get()) {
274            try {
275                tl.onCommand(command);
276            } catch (Exception e) {
277                try {
278                    peer.transportListener.onException(IOExceptionSupport.create(e));
279                } catch (Exception ignore) {
280                }
281            }
282            return !mq.isEmpty() && !disposed.get();
283        } else {
284            if(disposed.get()) {
285                mq.clear();
286            }
287            return false;
288        }
289    }
290
291    @Override
292    public void setTransportListener(TransportListener commandListener) {
293        this.transportListener = commandListener;
294    }
295
296    public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException {
297        LinkedBlockingQueue<Object> result = messageQueue;
298        if (result == null) {
299            synchronized (this) {
300                result = messageQueue;
301                if (result == null) {
302                    if (disposed.get()) {
303                        throw new TransportDisposedIOException("The Transport has been disposed");
304                    }
305
306                    messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
307                }
308            }
309        }
310        return result;
311    }
312
313    protected TaskRunner getTaskRunner() throws TransportDisposedIOException {
314        TaskRunner result = taskRunner;
315        if (result == null) {
316            synchronized (this) {
317                result = taskRunner;
318                if (result == null) {
319                    if (disposed.get()) {
320                        throw new TransportDisposedIOException("The Transport has been disposed");
321                    }
322
323                    String name = "ActiveMQ VMTransport: " + toString();
324                    if (taskRunnerFactory == null) {
325                        taskRunnerFactory = new TaskRunnerFactory(name);
326                        taskRunnerFactory.init();
327                    }
328                    taskRunner = result = taskRunnerFactory.createTaskRunner(this, name);
329                }
330            }
331        }
332        return result;
333    }
334
335    @Override
336    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
337        throw new AssertionError("Unsupported Method");
338    }
339
340    @Override
341    public Object request(Object command) throws IOException {
342        throw new AssertionError("Unsupported Method");
343    }
344
345    @Override
346    public Object request(Object command, int timeout) throws IOException {
347        throw new AssertionError("Unsupported Method");
348    }
349
350    @Override
351    public TransportListener getTransportListener() {
352        return transportListener;
353    }
354
355    @Override
356    public <T> T narrow(Class<T> target) {
357        if (target.isAssignableFrom(getClass())) {
358            return target.cast(this);
359        }
360        return null;
361    }
362
363    public boolean isMarshal() {
364        return marshal;
365    }
366
367    public void setMarshal(boolean marshal) {
368        this.marshal = marshal;
369    }
370
371    @Override
372    public String toString() {
373        return location + "#" + id;
374    }
375
376    @Override
377    public String getRemoteAddress() {
378        if (peer != null) {
379            return peer.toString();
380        }
381        return null;
382    }
383
384    /**
385     * @return the async
386     */
387    public boolean isAsync() {
388        return async;
389    }
390
391    /**
392     * @param async the async to set
393     */
394    public void setAsync(boolean async) {
395        this.async = async;
396    }
397
398    /**
399     * @return the asyncQueueDepth
400     */
401    public int getAsyncQueueDepth() {
402        return asyncQueueDepth;
403    }
404
405    /**
406     * @param asyncQueueDepth the asyncQueueDepth to set
407     */
408    public void setAsyncQueueDepth(int asyncQueueDepth) {
409        this.asyncQueueDepth = asyncQueueDepth;
410    }
411
412    @Override
413    public boolean isFaultTolerant() {
414        return false;
415    }
416
417    @Override
418    public boolean isDisposed() {
419        return disposed.get();
420    }
421
422    @Override
423    public boolean isConnected() {
424        return !disposed.get();
425    }
426
427    @Override
428    public void reconnect(URI uri) throws IOException {
429        throw new IOException("Transport reconnect is not supported");
430    }
431
432    @Override
433    public boolean isReconnectSupported() {
434        return false;
435    }
436
437    @Override
438    public boolean isUpdateURIsSupported() {
439        return false;
440    }
441
442    @Override
443    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
444        throw new IOException("URI update feature not supported");
445    }
446
447    @Override
448    public int getReceiveCounter() {
449        return receiveCounter;
450    }
451
452    @Override
453    public X509Certificate[] getPeerCertificates() {
454        return null;
455    }
456
457    @Override
458    public void setPeerCertificates(X509Certificate[] certificates) {
459
460    }
461
462    @Override
463    public WireFormat getWireFormat() {
464        return null;
465    }
466}