001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.nio;
018
019import java.io.IOException;
020import java.nio.channels.SelectionKey;
021import java.nio.channels.Selector;
022import java.util.Iterator;
023import java.util.Set;
024import java.util.concurrent.ConcurrentLinkedQueue;
025import java.util.concurrent.atomic.AtomicInteger;
026
027public class SelectorWorker implements Runnable {
028
029    private static final AtomicInteger NEXT_ID = new AtomicInteger();
030
031    final SelectorManager manager;
032    final Selector selector;
033    final int id = NEXT_ID.getAndIncrement();
034    private final int maxChannelsPerWorker;
035
036    final AtomicInteger retainCounter = new AtomicInteger(1);
037    private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>();
038
039    public SelectorWorker(SelectorManager manager) throws IOException {
040        this.manager = manager;
041        selector = Selector.open();
042        maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
043        manager.getSelectorExecutor().execute(this);
044    }
045
046    void retain() {
047        if (retainCounter.incrementAndGet() == maxChannelsPerWorker) {
048            manager.onWorkerFullEvent(this);
049        }
050    }
051
052    void release() {
053        int use = retainCounter.decrementAndGet();
054        if (use == 0) {
055            manager.onWorkerEmptyEvent(this);
056        } else if (use == maxChannelsPerWorker - 1) {
057            manager.onWorkerNotFullEvent(this);
058        }
059    }
060
061    boolean isReleased() {
062        return retainCounter.get() == 0;
063    }
064
065    public void addIoTask(Runnable work) {
066        ioTasks.add(work);
067        selector.wakeup();
068    }
069
070    private void processIoTasks() {
071        Runnable task;
072        while ((task = ioTasks.poll()) != null) {
073            try {
074                task.run();
075            } catch (Throwable e) {
076                e.printStackTrace();
077            }
078        }
079    }
080
081    @Override
082    public void run() {
083
084        String origName = Thread.currentThread().getName();
085        try {
086            Thread.currentThread().setName("Selector Worker: " + id);
087            while (!isReleased()) {
088
089                processIoTasks();
090
091                int count = selector.select(10);
092
093                if (count == 0) {
094                    continue;
095                }
096
097                // Get a java.util.Set containing the SelectionKey objects
098                // for all channels that are ready for I/O.
099                Set<SelectionKey> keys = selector.selectedKeys();
100
101                for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext();) {
102                    final SelectionKey key = i.next();
103                    i.remove();
104
105                    final SelectorSelection s = (SelectorSelection) key.attachment();
106                    try {
107                        if (key.isValid()) {
108                            key.interestOps(0);
109                        }
110
111                        // Kick off another thread to find newly selected keys
112                        // while we process the
113                        // currently selected keys
114                        manager.getChannelExecutor().execute(new Runnable() {
115                            @Override
116                            public void run() {
117                                try {
118                                    s.onSelect();
119                                    s.enable();
120                                } catch (Throwable e) {
121                                    s.onError(e);
122                                }
123                            }
124                        });
125
126                    } catch (Throwable e) {
127                        s.onError(e);
128                    }
129                }
130            }
131        } catch (Throwable e) {
132            e.printStackTrace();
133            // Notify all the selections that the error occurred.
134            Set<SelectionKey> keys = selector.keys();
135            for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext();) {
136                SelectionKey key = i.next();
137                SelectorSelection s = (SelectorSelection) key.attachment();
138                s.onError(e);
139            }
140        } finally {
141            try {
142                manager.onWorkerEmptyEvent(this);
143                selector.close();
144            } catch (IOException ignore) {
145                ignore.printStackTrace();
146            }
147            Thread.currentThread().setName(origName);
148        }
149    }
150}