2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
20 package org.eclipse.jetty.util.thread;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
33 import org.eclipse.jetty.util.BlockingArrayQueue;
34 import org.eclipse.jetty.util.StringUtil;
35 import org.eclipse.jetty.util.annotation.ManagedAttribute;
36 import org.eclipse.jetty.util.annotation.ManagedObject;
37 import org.eclipse.jetty.util.annotation.ManagedOperation;
38 import org.eclipse.jetty.util.annotation.Name;
39 import org.eclipse.jetty.util.component.AbstractLifeCycle;
40 import org.eclipse.jetty.util.component.ContainerLifeCycle;
41 import org.eclipse.jetty.util.component.Dumpable;
42 import org.eclipse.jetty.util.component.LifeCycle;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
47 @ManagedObject("A thread pool with no max bound by default")
48 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
50 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
52 private final AtomicInteger _threadsStarted = new AtomicInteger();
53 private final AtomicInteger _threadsIdle = new AtomicInteger();
54 private final AtomicLong _lastShrink = new AtomicLong();
55 private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
56 private final Object _joinLock = new Object();
57 private final BlockingQueue<Runnable> _jobs;
58 private String _name = "qtp" + hashCode();
59 private int _idleTimeout;
60 private int _maxThreads;
61 private int _minThreads;
62 private int _priority = Thread.NORM_PRIORITY;
63 private boolean _daemon = false;
64 private boolean _detailedDump = false;
66 public QueuedThreadPool()
71 public QueuedThreadPool(@Name("maxThreads") int maxThreads)
76 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads)
78 this(maxThreads, minThreads, 60000);
81 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
83 this(maxThreads, minThreads, idleTimeout, null);
86 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
88 setMinThreads(minThreads);
89 setMaxThreads(maxThreads);
90 setIdleTimeout(idleTimeout);
94 queue=new BlockingArrayQueue<>(_minThreads, _minThreads);
100 protected void doStart() throws Exception
103 _threadsStarted.set(0);
105 startThreads(_minThreads);
109 protected void doStop() throws Exception
113 long timeout = getStopTimeout();
114 BlockingQueue<Runnable> jobs = getQueue();
116 // If no stop timeout, clear job queue
120 // Fill job Q with noop jobs to wakeup idle
121 Runnable noop = new Runnable()
128 for (int i = _threadsStarted.get(); i-- > 0; )
131 // try to jobs complete naturally for half our stop time
132 long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
133 for (Thread thread : _threads)
135 long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
137 thread.join(canwait);
140 // If we still have threads running, get a bit more aggressive
142 // interrupt remaining threads
143 if (_threadsStarted.get() > 0)
144 for (Thread thread : _threads)
147 // wait again for the other half of our stop time
148 stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
149 for (Thread thread : _threads)
151 long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
153 thread.join(canwait);
157 int size = _threads.size();
162 if (LOG.isDebugEnabled())
164 for (Thread unstopped : _threads)
166 StringBuilder dmp = new StringBuilder();
167 for (StackTraceElement element : unstopped.getStackTrace())
169 dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
171 LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
176 for (Thread unstopped : _threads)
177 LOG.warn("{} Couldn't stop {}",this,unstopped);
181 synchronized (_joinLock)
183 _joinLock.notifyAll();
188 * Delegated to the named or anonymous Pool.
190 public void setDaemon(boolean daemon)
196 * Set the maximum thread idle time.
197 * Threads that are idle for longer than this period may be
199 * Delegated to the named or anonymous Pool.
201 * @param idleTimeout Max idle time in ms.
202 * @see #getIdleTimeout
204 public void setIdleTimeout(int idleTimeout)
206 _idleTimeout = idleTimeout;
210 * Set the maximum number of threads.
211 * Delegated to the named or anonymous Pool.
213 * @param maxThreads maximum number of threads.
214 * @see #getMaxThreads
217 public void setMaxThreads(int maxThreads)
219 _maxThreads = maxThreads;
220 if (_minThreads > _maxThreads)
221 _minThreads = _maxThreads;
225 * Set the minimum number of threads.
226 * Delegated to the named or anonymous Pool.
228 * @param minThreads minimum number of threads
229 * @see #getMinThreads
232 public void setMinThreads(int minThreads)
234 _minThreads = minThreads;
236 if (_minThreads > _maxThreads)
237 _maxThreads = _minThreads;
239 int threads = _threadsStarted.get();
240 if (isStarted() && threads < _minThreads)
241 startThreads(_minThreads - threads);
245 * @param name Name of this thread pool to use when naming threads.
247 public void setName(String name)
250 throw new IllegalStateException("started");
255 * Set the priority of the pool threads.
257 * @param priority the new thread priority.
259 public void setThreadsPriority(int priority)
261 _priority = priority;
265 * Get the maximum thread idle time.
266 * Delegated to the named or anonymous Pool.
268 * @return Max idle time in ms.
269 * @see #setIdleTimeout
271 @ManagedAttribute("maximum time a thread may be idle in ms")
272 public int getIdleTimeout()
278 * Set the maximum number of threads.
279 * Delegated to the named or anonymous Pool.
281 * @return maximum number of threads.
282 * @see #setMaxThreads
285 @ManagedAttribute("maximum number of threads in the pool")
286 public int getMaxThreads()
292 * Get the minimum number of threads.
293 * Delegated to the named or anonymous Pool.
295 * @return minimum number of threads.
296 * @see #setMinThreads
299 @ManagedAttribute("minimum number of threads in the pool")
300 public int getMinThreads()
306 * @return The name of the this thread pool
308 @ManagedAttribute("name of the thread pool")
309 public String getName()
315 * Get the priority of the pool threads.
317 * @return the priority of the pool threads.
319 @ManagedAttribute("priority of threads in the pool")
320 public int getThreadsPriority()
326 * Get the size of the job queue.
328 * @return Number of jobs queued waiting for a thread
330 @ManagedAttribute("Size of the job queue")
331 public int getQueueSize()
337 * Delegated to the named or anonymous Pool.
339 @ManagedAttribute("thead pool using a daemon thread")
340 public boolean isDaemon()
345 public boolean isDetailedDump()
347 return _detailedDump;
350 public void setDetailedDump(boolean detailedDump)
352 _detailedDump = detailedDump;
356 public void execute(Runnable job)
358 if (!isRunning() || !_jobs.offer(job))
360 LOG.warn("{} rejected {}", this, job);
361 throw new RejectedExecutionException(job.toString());
366 * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
369 public void join() throws InterruptedException
371 synchronized (_joinLock)
382 * @return The total number of threads currently in the pool
385 @ManagedAttribute("total number of threads currently in the pool")
386 public int getThreads()
388 return _threadsStarted.get();
392 * @return The number of idle threads in the pool
395 @ManagedAttribute("total number of idle threads in the pool")
396 public int getIdleThreads()
398 return _threadsIdle.get();
402 * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
405 @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
406 public boolean isLowOnThreads()
408 return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
411 private boolean startThreads(int threadsToStart)
413 while (threadsToStart > 0)
415 int threads = _threadsStarted.get();
416 if (threads >= _maxThreads)
419 if (!_threadsStarted.compareAndSet(threads, threads + 1))
422 boolean started = false;
425 Thread thread = newThread(_runnable);
426 thread.setDaemon(isDaemon());
427 thread.setPriority(getThreadsPriority());
428 thread.setName(_name + "-" + thread.getId());
429 _threads.add(thread);
437 _threadsStarted.decrementAndGet();
445 protected Thread newThread(Runnable runnable)
447 return new Thread(runnable);
452 @ManagedOperation("dump thread state")
455 return ContainerLifeCycle.dump(this);
459 public void dump(Appendable out, String indent) throws IOException
461 List<Object> dump = new ArrayList<>(getMaxThreads());
462 for (final Thread thread : _threads)
464 final StackTraceElement[] trace = thread.getStackTrace();
465 boolean inIdleJobPoll = false;
466 for (StackTraceElement t : trace)
468 if ("idleJobPoll".equals(t.getMethodName()))
470 inIdleJobPoll = true;
474 final boolean idle = inIdleJobPoll;
476 if (isDetailedDump())
478 dump.add(new Dumpable()
481 public void dump(Appendable out, String indent) throws IOException
483 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n');
485 ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
497 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : ""));
501 ContainerLifeCycle.dumpObject(out, this);
502 ContainerLifeCycle.dump(out, indent, dump);
506 public String toString()
508 return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
511 private Runnable idleJobPoll() throws InterruptedException
513 return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
516 private Runnable _runnable = new Runnable()
521 boolean shrink = false;
524 Runnable job = _jobs.poll();
526 if (job != null && _threadsIdle.get() == 0)
531 loop: while (isRunning())
534 while (job != null && isRunning())
537 if (Thread.interrupted())
545 _threadsIdle.incrementAndGet();
547 while (isRunning() && job == null)
549 if (_idleTimeout <= 0)
553 // maybe we should shrink?
554 final int size = _threadsStarted.get();
555 if (size > _minThreads)
557 long last = _lastShrink.get();
558 long now = System.nanoTime();
559 if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
561 shrink = _lastShrink.compareAndSet(last, now) &&
562 _threadsStarted.compareAndSet(size, size - 1);
575 if (_threadsIdle.decrementAndGet() == 0)
582 catch (InterruptedException e)
593 _threadsStarted.decrementAndGet();
594 _threads.remove(Thread.currentThread());
600 * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
601 * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
603 * @param job the job to run
605 protected void runJob(Runnable job)
611 * @return the job queue
613 protected BlockingQueue<Runnable> getQueue()
619 * @param queue the job queue
621 public void setQueue(BlockingQueue<Runnable> queue)
623 throw new UnsupportedOperationException("Use constructor injection");
627 * @param id The thread ID to interrupt.
628 * @return true if the thread was found and interrupted.
630 @ManagedOperation("interrupt a pool thread")
631 public boolean interruptThread(@Name("id") long id)
633 for (Thread thread : _threads)
635 if (thread.getId() == id)
645 * @param id The thread ID to interrupt.
646 * @return true if the thread was found and interrupted.
648 @ManagedOperation("dump a pool thread stack")
649 public String dumpThread(@Name("id") long id)
651 for (Thread thread : _threads)
653 if (thread.getId() == id)
655 StringBuilder buf = new StringBuilder();
656 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
657 for (StackTraceElement element : thread.getStackTrace())
658 buf.append(" at ").append(element.toString()).append('\n');
659 return buf.toString();