2 // ========================================================================
3 // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
20 package org.eclipse.jetty.util.thread;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.RejectedExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.atomic.AtomicLong;
32 import org.eclipse.jetty.util.BlockingArrayQueue;
33 import org.eclipse.jetty.util.ConcurrentHashSet;
34 import org.eclipse.jetty.util.annotation.ManagedAttribute;
35 import org.eclipse.jetty.util.annotation.ManagedObject;
36 import org.eclipse.jetty.util.annotation.ManagedOperation;
37 import org.eclipse.jetty.util.annotation.Name;
38 import org.eclipse.jetty.util.component.AbstractLifeCycle;
39 import org.eclipse.jetty.util.component.ContainerLifeCycle;
40 import org.eclipse.jetty.util.component.Dumpable;
41 import org.eclipse.jetty.util.component.LifeCycle;
42 import org.eclipse.jetty.util.log.Log;
43 import org.eclipse.jetty.util.log.Logger;
44 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
46 @ManagedObject("A thread pool with no max bound by default")
47 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
49 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
51 private final AtomicInteger _threadsStarted = new AtomicInteger();
52 private final AtomicInteger _threadsIdle = new AtomicInteger();
53 private final AtomicLong _lastShrink = new AtomicLong();
54 private final ConcurrentHashSet<Thread> _threads=new ConcurrentHashSet<Thread>();
55 private final Object _joinLock = new Object();
56 private final BlockingQueue<Runnable> _jobs;
57 private String _name = "qtp" + hashCode();
58 private int _idleTimeout;
59 private int _maxThreads;
60 private int _minThreads;
61 private int _priority = Thread.NORM_PRIORITY;
62 private boolean _daemon = false;
63 private boolean _detailedDump = false;
65 public QueuedThreadPool()
70 public QueuedThreadPool(@Name("maxThreads") int maxThreads)
75 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads)
77 this(maxThreads, minThreads, 60000);
80 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
82 this(maxThreads, minThreads, idleTimeout, null);
85 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
87 setMinThreads(minThreads);
88 setMaxThreads(maxThreads);
89 setIdleTimeout(idleTimeout);
94 int capacity=Math.max(_minThreads, 8);
95 queue=new BlockingArrayQueue<>(capacity, capacity);
101 protected void doStart() throws Exception
104 _threadsStarted.set(0);
106 startThreads(_minThreads);
110 protected void doStop() throws Exception
114 long timeout = getStopTimeout();
115 BlockingQueue<Runnable> jobs = getQueue();
117 // If no stop timeout, clear job queue
121 // Fill job Q with noop jobs to wakeup idle
122 Runnable noop = new Runnable()
129 for (int i = _threadsStarted.get(); i-- > 0; )
132 // try to jobs complete naturally for half our stop time
133 long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
134 for (Thread thread : _threads)
136 long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
138 thread.join(canwait);
141 // If we still have threads running, get a bit more aggressive
143 // interrupt remaining threads
144 if (_threadsStarted.get() > 0)
145 for (Thread thread : _threads)
148 // wait again for the other half of our stop time
149 stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
150 for (Thread thread : _threads)
152 long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
154 thread.join(canwait);
158 int size = _threads.size();
163 if (LOG.isDebugEnabled())
165 for (Thread unstopped : _threads)
167 StringBuilder dmp = new StringBuilder();
168 for (StackTraceElement element : unstopped.getStackTrace())
170 dmp.append(System.lineSeparator()).append("\tat ").append(element);
172 LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
177 for (Thread unstopped : _threads)
178 LOG.warn("{} Couldn't stop {}",this,unstopped);
182 synchronized (_joinLock)
184 _joinLock.notifyAll();
189 * Delegated to the named or anonymous Pool.
191 public void setDaemon(boolean daemon)
197 * Set the maximum thread idle time.
198 * Threads that are idle for longer than this period may be
200 * Delegated to the named or anonymous Pool.
202 * @param idleTimeout Max idle time in ms.
203 * @see #getIdleTimeout
205 public void setIdleTimeout(int idleTimeout)
207 _idleTimeout = idleTimeout;
211 * Set the maximum number of threads.
212 * Delegated to the named or anonymous Pool.
214 * @param maxThreads maximum number of threads.
215 * @see #getMaxThreads
218 public void setMaxThreads(int maxThreads)
220 _maxThreads = maxThreads;
221 if (_minThreads > _maxThreads)
222 _minThreads = _maxThreads;
226 * Set the minimum number of threads.
227 * Delegated to the named or anonymous Pool.
229 * @param minThreads minimum number of threads
230 * @see #getMinThreads
233 public void setMinThreads(int minThreads)
235 _minThreads = minThreads;
237 if (_minThreads > _maxThreads)
238 _maxThreads = _minThreads;
240 int threads = _threadsStarted.get();
241 if (isStarted() && threads < _minThreads)
242 startThreads(_minThreads - threads);
246 * @param name Name of this thread pool to use when naming threads.
248 public void setName(String name)
251 throw new IllegalStateException("started");
256 * Set the priority of the pool threads.
258 * @param priority the new thread priority.
260 public void setThreadsPriority(int priority)
262 _priority = priority;
266 * Get the maximum thread idle time.
267 * Delegated to the named or anonymous Pool.
269 * @return Max idle time in ms.
270 * @see #setIdleTimeout
272 @ManagedAttribute("maximum time a thread may be idle in ms")
273 public int getIdleTimeout()
279 * Set the maximum number of threads.
280 * Delegated to the named or anonymous Pool.
282 * @return maximum number of threads.
283 * @see #setMaxThreads
286 @ManagedAttribute("maximum number of threads in the pool")
287 public int getMaxThreads()
293 * Get the minimum number of threads.
294 * Delegated to the named or anonymous Pool.
296 * @return minimum number of threads.
297 * @see #setMinThreads
300 @ManagedAttribute("minimum number of threads in the pool")
301 public int getMinThreads()
307 * @return The name of the this thread pool
309 @ManagedAttribute("name of the thread pool")
310 public String getName()
316 * Get the priority of the pool threads.
318 * @return the priority of the pool threads.
320 @ManagedAttribute("priority of threads in the pool")
321 public int getThreadsPriority()
327 * Get the size of the job queue.
329 * @return Number of jobs queued waiting for a thread
331 @ManagedAttribute("Size of the job queue")
332 public int getQueueSize()
338 * Delegated to the named or anonymous Pool.
340 @ManagedAttribute("thead pool using a daemon thread")
341 public boolean isDaemon()
346 public boolean isDetailedDump()
348 return _detailedDump;
351 public void setDetailedDump(boolean detailedDump)
353 _detailedDump = detailedDump;
357 public void execute(Runnable job)
359 if (!isRunning() || !_jobs.offer(job))
361 LOG.warn("{} rejected {}", this, job);
362 throw new RejectedExecutionException(job.toString());
366 // Make sure there is at least one thread executing the job.
367 if (getThreads() == 0)
373 * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
376 public void join() throws InterruptedException
378 synchronized (_joinLock)
389 * @return The total number of threads currently in the pool
392 @ManagedAttribute("total number of threads currently in the pool")
393 public int getThreads()
395 return _threadsStarted.get();
399 * @return The number of idle threads in the pool
402 @ManagedAttribute("total number of idle threads in the pool")
403 public int getIdleThreads()
405 return _threadsIdle.get();
409 * @return The number of busy threads in the pool
411 @ManagedAttribute("total number of busy threads in the pool")
412 public int getBusyThreads()
414 return getThreads() - getIdleThreads();
418 * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
421 @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
422 public boolean isLowOnThreads()
424 return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
427 private boolean startThreads(int threadsToStart)
429 while (threadsToStart > 0 && isRunning())
431 int threads = _threadsStarted.get();
432 if (threads >= _maxThreads)
435 if (!_threadsStarted.compareAndSet(threads, threads + 1))
438 boolean started = false;
441 Thread thread = newThread(_runnable);
442 thread.setDaemon(isDaemon());
443 thread.setPriority(getThreadsPriority());
444 thread.setName(_name + "-" + thread.getId());
445 _threads.add(thread);
454 _threadsStarted.decrementAndGet();
460 protected Thread newThread(Runnable runnable)
462 return new Thread(runnable);
466 @ManagedOperation("dump thread state")
469 return ContainerLifeCycle.dump(this);
473 public void dump(Appendable out, String indent) throws IOException
475 List<Object> dump = new ArrayList<>(getMaxThreads());
476 for (final Thread thread : _threads)
478 final StackTraceElement[] trace = thread.getStackTrace();
479 boolean inIdleJobPoll = false;
480 for (StackTraceElement t : trace)
482 if ("idleJobPoll".equals(t.getMethodName()))
484 inIdleJobPoll = true;
488 final boolean idle = inIdleJobPoll;
490 if (isDetailedDump())
492 dump.add(new Dumpable()
495 public void dump(Appendable out, String indent) throws IOException
497 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "");
498 if (thread.getPriority()!=Thread.NORM_PRIORITY)
499 out.append(" prio=").append(String.valueOf(thread.getPriority()));
500 out.append(System.lineSeparator());
502 ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
514 int p=thread.getPriority();
515 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
519 ContainerLifeCycle.dumpObject(out, this);
520 ContainerLifeCycle.dump(out, indent, dump);
524 public String toString()
526 return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
529 private Runnable idleJobPoll() throws InterruptedException
531 return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
534 private Runnable _runnable = new Runnable()
539 boolean shrink = false;
540 boolean ignore = false;
543 Runnable job = _jobs.poll();
545 if (job != null && _threadsIdle.get() == 0)
550 loop: while (isRunning())
553 while (job != null && isRunning())
556 if (Thread.interrupted())
567 _threadsIdle.incrementAndGet();
569 while (isRunning() && job == null)
571 if (_idleTimeout <= 0)
575 // maybe we should shrink?
576 final int size = _threadsStarted.get();
577 if (size > _minThreads)
579 long last = _lastShrink.get();
580 long now = System.nanoTime();
581 if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
583 if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1))
596 if (_threadsIdle.decrementAndGet() == 0)
603 catch (InterruptedException e)
614 if (!shrink && isRunning())
617 LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this);
618 // This is an unexpected thread death!
619 if (_threadsStarted.decrementAndGet()<getMaxThreads())
622 _threads.remove(Thread.currentThread());
628 * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
629 * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
631 * @param job the job to run
633 protected void runJob(Runnable job)
639 * @return the job queue
641 protected BlockingQueue<Runnable> getQueue()
647 * @param queue the job queue
649 public void setQueue(BlockingQueue<Runnable> queue)
651 throw new UnsupportedOperationException("Use constructor injection");
655 * @param id The thread ID to interrupt.
656 * @return true if the thread was found and interrupted.
658 @ManagedOperation("interrupt a pool thread")
659 public boolean interruptThread(@Name("id") long id)
661 for (Thread thread : _threads)
663 if (thread.getId() == id)
673 * @param id The thread ID to interrupt.
674 * @return true if the thread was found and interrupted.
676 @ManagedOperation("dump a pool thread stack")
677 public String dumpThread(@Name("id") long id)
679 for (Thread thread : _threads)
681 if (thread.getId() == id)
683 StringBuilder buf = new StringBuilder();
684 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
685 buf.append(thread.getState()).append(":").append(System.lineSeparator());
686 for (StackTraceElement element : thread.getStackTrace())
687 buf.append(" at ").append(element.toString()).append(System.lineSeparator());
688 return buf.toString();