001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.thread; 018 019import java.util.concurrent.Executor; 020import java.util.concurrent.ExecutorService; 021import java.util.concurrent.RejectedExecutionHandler; 022import java.util.concurrent.SynchronousQueue; 023import java.util.concurrent.ThreadFactory; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028import java.util.concurrent.atomic.AtomicReference; 029 030import org.apache.activemq.util.ThreadPoolUtils; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Manages the thread pool for long running tasks. Long running tasks are not 036 * always active but when they are active, they may need a few iterations of 037 * processing for them to become idle. The manager ensures that each task is 038 * processes but that no one task overtakes the system. This is somewhat like 039 * cooperative multitasking. 040 * 041 * @org.apache.xbean.XBean 042 */ 043public class TaskRunnerFactory implements Executor { 044 045 private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerFactory.class); 046 private final AtomicReference<ExecutorService> executorRef = new AtomicReference<>(); 047 private int maxIterationsPerRun; 048 private String name; 049 private int priority; 050 private boolean daemon; 051 private final AtomicLong id = new AtomicLong(0); 052 private boolean dedicatedTaskRunner; 053 private long shutdownAwaitTermination = 30000; 054 private final AtomicBoolean initDone = new AtomicBoolean(false); 055 private int maxThreadPoolSize = getDefaultMaximumPoolSize(); 056 private RejectedExecutionHandler rejectedTaskHandler = null; 057 private ClassLoader threadClassLoader; 058 059 public TaskRunnerFactory() { 060 this("ActiveMQ Task"); 061 } 062 063 public TaskRunnerFactory(String name) { 064 this(name, Thread.NORM_PRIORITY, true, 1000); 065 } 066 067 private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) { 068 this(name, priority, daemon, maxIterationsPerRun, false); 069 } 070 071 public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) { 072 this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, getDefaultMaximumPoolSize()); 073 } 074 075 public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) { 076 this.name = name; 077 this.priority = priority; 078 this.daemon = daemon; 079 this.maxIterationsPerRun = maxIterationsPerRun; 080 this.dedicatedTaskRunner = dedicatedTaskRunner; 081 this.maxThreadPoolSize = maxThreadPoolSize; 082 } 083 084 public void init() { 085 if (!initDone.get()) { 086 // If your OS/JVM combination has a good thread model, you may want to 087 // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead. 088 //AMQ-6602 - lock instead of using compareAndSet to prevent threads from seeing a null value 089 //for executorRef inside createTaskRunner() on contention and creating a DedicatedTaskRunner 090 synchronized(this) { 091 //need to recheck if initDone is true under the lock 092 if (!initDone.get()) { 093 if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) { 094 executorRef.set(null); 095 } else { 096 executorRef.compareAndSet(null, createDefaultExecutor()); 097 } 098 LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executorRef.get()); 099 initDone.set(true); 100 } 101 } 102 } 103 } 104 105 /** 106 * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively. 107 * 108 * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService) 109 */ 110 public void shutdown() { 111 ExecutorService executor = executorRef.get(); 112 if (executor != null) { 113 ThreadPoolUtils.shutdown(executor); 114 } 115 clearExecutor(); 116 } 117 118 /** 119 * Performs a shutdown now (aggressively) on the thread pool. 120 * 121 * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService) 122 */ 123 public void shutdownNow() { 124 ExecutorService executor = executorRef.get(); 125 if (executor != null) { 126 ThreadPoolUtils.shutdownNow(executor); 127 } 128 clearExecutor(); 129 } 130 131 /** 132 * Performs a graceful shutdown. 133 * 134 * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService) 135 */ 136 public void shutdownGraceful() { 137 ExecutorService executor = executorRef.get(); 138 if (executor != null) { 139 ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination); 140 } 141 clearExecutor(); 142 } 143 144 private void clearExecutor() { 145 //clear under a lock to prevent threads from seeing initDone == true 146 //but then getting null from executorRef 147 synchronized(this) { 148 executorRef.set(null); 149 initDone.set(false); 150 } 151 } 152 153 public TaskRunner createTaskRunner(Task task, String name) { 154 init(); 155 ExecutorService executor = executorRef.get(); 156 if (executor != null) { 157 return new PooledTaskRunner(executor, task, maxIterationsPerRun); 158 } else { 159 return new DedicatedTaskRunner(task, name, priority, daemon); 160 } 161 } 162 163 @Override 164 public void execute(Runnable runnable) { 165 execute(runnable, name); 166 } 167 168 public void execute(Runnable runnable, String name) { 169 init(); 170 LOG.trace("Execute[{}] runnable: {}", name, runnable); 171 ExecutorService executor = executorRef.get(); 172 if (executor != null) { 173 executor.execute(runnable); 174 } else { 175 doExecuteNewThread(runnable, name); 176 } 177 } 178 179 private void doExecuteNewThread(Runnable runnable, String name) { 180 String threadName = name + "-" + id.incrementAndGet(); 181 Thread thread = new Thread(runnable, threadName); 182 thread.setDaemon(daemon); 183 184 LOG.trace("Created and running thread[{}]: {}", threadName, thread); 185 thread.start(); 186 } 187 188 protected ExecutorService createDefaultExecutor() { 189 ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getMaxThreadPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { 190 @Override 191 public Thread newThread(Runnable runnable) { 192 String threadName = name + "-" + id.incrementAndGet(); 193 Thread thread = new Thread(runnable, threadName); 194 thread.setDaemon(daemon); 195 thread.setPriority(priority); 196 if (threadClassLoader != null) { 197 thread.setContextClassLoader(threadClassLoader); 198 } 199 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 200 @Override 201 public void uncaughtException(final Thread t, final Throwable e) { 202 LOG.error("Error in thread '{}'", t.getName(), e); 203 } 204 }); 205 206 LOG.trace("Created thread[{}]: {}", threadName, thread); 207 return thread; 208 } 209 }); 210 211 if (rejectedTaskHandler != null) { 212 rc.setRejectedExecutionHandler(rejectedTaskHandler); 213 } else { 214 rc.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 215 } 216 217 return rc; 218 } 219 220 public ExecutorService getExecutor() { 221 return executorRef.get(); 222 } 223 224 public void setExecutor(ExecutorService executor) { 225 this.executorRef.set(executor); 226 } 227 228 public int getMaxIterationsPerRun() { 229 return maxIterationsPerRun; 230 } 231 232 public void setMaxIterationsPerRun(int maxIterationsPerRun) { 233 this.maxIterationsPerRun = maxIterationsPerRun; 234 } 235 236 public String getName() { 237 return name; 238 } 239 240 public void setName(String name) { 241 this.name = name; 242 } 243 244 public int getPriority() { 245 return priority; 246 } 247 248 public void setPriority(int priority) { 249 this.priority = priority; 250 } 251 252 public boolean isDaemon() { 253 return daemon; 254 } 255 256 public void setDaemon(boolean daemon) { 257 this.daemon = daemon; 258 } 259 260 public boolean isDedicatedTaskRunner() { 261 return dedicatedTaskRunner; 262 } 263 264 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 265 this.dedicatedTaskRunner = dedicatedTaskRunner; 266 } 267 268 public int getMaxThreadPoolSize() { 269 return maxThreadPoolSize; 270 } 271 272 public void setMaxThreadPoolSize(int maxThreadPoolSize) { 273 this.maxThreadPoolSize = maxThreadPoolSize; 274 } 275 276 public void setThreadClassLoader(ClassLoader threadClassLoader) { 277 this.threadClassLoader = threadClassLoader; 278 } 279 280 public RejectedExecutionHandler getRejectedTaskHandler() { 281 return rejectedTaskHandler; 282 } 283 284 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) { 285 this.rejectedTaskHandler = rejectedTaskHandler; 286 } 287 288 public long getShutdownAwaitTermination() { 289 return shutdownAwaitTermination; 290 } 291 292 public void setShutdownAwaitTermination(long shutdownAwaitTermination) { 293 this.shutdownAwaitTermination = shutdownAwaitTermination; 294 } 295 296 private static int getDefaultCorePoolSize() { 297 return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.corePoolSize", 0); 298 } 299 300 private static int getDefaultMaximumPoolSize() { 301 return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.maximumPoolSize", Integer.MAX_VALUE); 302 } 303 304 private static int getDefaultKeepAliveTime() { 305 return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.keepAliveTime", 30); 306 } 307}