]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/util/thread/QueuedThreadPool.java
updating jetty to jetty-9.2.16.v2016040
[gigi.git] / lib / jetty / org / eclipse / jetty / util / thread / QueuedThreadPool.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19
20 package org.eclipse.jetty.util.thread;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.RejectedExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import org.eclipse.jetty.util.BlockingArrayQueue;
33 import org.eclipse.jetty.util.ConcurrentHashSet;
34 import org.eclipse.jetty.util.annotation.ManagedAttribute;
35 import org.eclipse.jetty.util.annotation.ManagedObject;
36 import org.eclipse.jetty.util.annotation.ManagedOperation;
37 import org.eclipse.jetty.util.annotation.Name;
38 import org.eclipse.jetty.util.component.AbstractLifeCycle;
39 import org.eclipse.jetty.util.component.ContainerLifeCycle;
40 import org.eclipse.jetty.util.component.Dumpable;
41 import org.eclipse.jetty.util.component.LifeCycle;
42 import org.eclipse.jetty.util.log.Log;
43 import org.eclipse.jetty.util.log.Logger;
44 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
45
46 @ManagedObject("A thread pool with no max bound by default")
47 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
48 {
49     private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
50
51     private final AtomicInteger _threadsStarted = new AtomicInteger();
52     private final AtomicInteger _threadsIdle = new AtomicInteger();
53     private final AtomicLong _lastShrink = new AtomicLong();
54     private final ConcurrentHashSet<Thread> _threads=new ConcurrentHashSet<Thread>();
55     private final Object _joinLock = new Object();
56     private final BlockingQueue<Runnable> _jobs;
57     private String _name = "qtp" + hashCode();
58     private int _idleTimeout;
59     private int _maxThreads;
60     private int _minThreads;
61     private int _priority = Thread.NORM_PRIORITY;
62     private boolean _daemon = false;
63     private boolean _detailedDump = false;
64
65     public QueuedThreadPool()
66     {
67         this(200);
68     }
69
70     public QueuedThreadPool(@Name("maxThreads") int maxThreads)
71     {
72         this(maxThreads, 8);
73     }
74
75     public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads)
76     {
77         this(maxThreads, minThreads, 60000);
78     }
79
80     public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
81     {
82         this(maxThreads, minThreads, idleTimeout, null);
83     }
84
85     public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
86     {
87         setMinThreads(minThreads);
88         setMaxThreads(maxThreads);
89         setIdleTimeout(idleTimeout);
90         setStopTimeout(5000);
91
92         if (queue==null)
93         {
94             int capacity=Math.max(_minThreads, 8);
95             queue=new BlockingArrayQueue<>(capacity, capacity);
96         }
97         _jobs=queue;
98     }
99
100     @Override
101     protected void doStart() throws Exception
102     {
103         super.doStart();
104         _threadsStarted.set(0);
105
106         startThreads(_minThreads);
107     }
108
109     @Override
110     protected void doStop() throws Exception
111     {
112         super.doStop();
113
114         long timeout = getStopTimeout();
115         BlockingQueue<Runnable> jobs = getQueue();
116
117         // If no stop timeout, clear job queue
118         if (timeout <= 0)
119             jobs.clear();
120
121         // Fill job Q with noop jobs to wakeup idle
122         Runnable noop = new Runnable()
123         {
124             @Override
125             public void run()
126             {
127             }
128         };
129         for (int i = _threadsStarted.get(); i-- > 0; )
130             jobs.offer(noop);
131
132         // try to jobs complete naturally for half our stop time
133         long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
134         for (Thread thread : _threads)
135         {
136             long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
137             if (canwait > 0)
138                 thread.join(canwait);
139         }
140
141         // If we still have threads running, get a bit more aggressive
142
143         // interrupt remaining threads
144         if (_threadsStarted.get() > 0)
145             for (Thread thread : _threads)
146                 thread.interrupt();
147
148         // wait again for the other half of our stop time
149         stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
150         for (Thread thread : _threads)
151         {
152             long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
153             if (canwait > 0)
154                 thread.join(canwait);
155         }
156
157         Thread.yield();
158         int size = _threads.size();
159         if (size > 0)
160         {
161             Thread.yield();
162             
163             if (LOG.isDebugEnabled())
164             {
165                 for (Thread unstopped : _threads)
166                 {
167                     StringBuilder dmp = new StringBuilder();
168                     for (StackTraceElement element : unstopped.getStackTrace())
169                     {
170                         dmp.append(System.lineSeparator()).append("\tat ").append(element);
171                     }
172                     LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
173                 }
174             }
175             else
176             {
177                 for (Thread unstopped : _threads)
178                     LOG.warn("{} Couldn't stop {}",this,unstopped);
179             }
180         }
181
182         synchronized (_joinLock)
183         {
184             _joinLock.notifyAll();
185         }
186     }
187
188     /**
189      * Delegated to the named or anonymous Pool.
190      */
191     public void setDaemon(boolean daemon)
192     {
193         _daemon = daemon;
194     }
195
196     /**
197      * Set the maximum thread idle time.
198      * Threads that are idle for longer than this period may be
199      * stopped.
200      * Delegated to the named or anonymous Pool.
201      *
202      * @param idleTimeout Max idle time in ms.
203      * @see #getIdleTimeout
204      */
205     public void setIdleTimeout(int idleTimeout)
206     {
207         _idleTimeout = idleTimeout;
208     }
209
210     /**
211      * Set the maximum number of threads.
212      * Delegated to the named or anonymous Pool.
213      *
214      * @param maxThreads maximum number of threads.
215      * @see #getMaxThreads
216      */
217     @Override
218     public void setMaxThreads(int maxThreads)
219     {
220         _maxThreads = maxThreads;
221         if (_minThreads > _maxThreads)
222             _minThreads = _maxThreads;
223     }
224
225     /**
226      * Set the minimum number of threads.
227      * Delegated to the named or anonymous Pool.
228      *
229      * @param minThreads minimum number of threads
230      * @see #getMinThreads
231      */
232     @Override
233     public void setMinThreads(int minThreads)
234     {
235         _minThreads = minThreads;
236
237         if (_minThreads > _maxThreads)
238             _maxThreads = _minThreads;
239
240         int threads = _threadsStarted.get();
241         if (isStarted() && threads < _minThreads)
242             startThreads(_minThreads - threads);
243     }
244
245     /**
246      * @param name Name of this thread pool to use when naming threads.
247      */
248     public void setName(String name)
249     {
250         if (isRunning())
251             throw new IllegalStateException("started");
252         _name = name;
253     }
254
255     /**
256      * Set the priority of the pool threads.
257      *
258      * @param priority the new thread priority.
259      */
260     public void setThreadsPriority(int priority)
261     {
262         _priority = priority;
263     }
264
265     /**
266      * Get the maximum thread idle time.
267      * Delegated to the named or anonymous Pool.
268      *
269      * @return Max idle time in ms.
270      * @see #setIdleTimeout
271      */
272     @ManagedAttribute("maximum time a thread may be idle in ms")
273     public int getIdleTimeout()
274     {
275         return _idleTimeout;
276     }
277
278     /**
279      * Set the maximum number of threads.
280      * Delegated to the named or anonymous Pool.
281      *
282      * @return maximum number of threads.
283      * @see #setMaxThreads
284      */
285     @Override
286     @ManagedAttribute("maximum number of threads in the pool")
287     public int getMaxThreads()
288     {
289         return _maxThreads;
290     }
291
292     /**
293      * Get the minimum number of threads.
294      * Delegated to the named or anonymous Pool.
295      *
296      * @return minimum number of threads.
297      * @see #setMinThreads
298      */
299     @Override
300     @ManagedAttribute("minimum number of threads in the pool")
301     public int getMinThreads()
302     {
303         return _minThreads;
304     }
305
306     /**
307      * @return The name of the this thread pool
308      */
309     @ManagedAttribute("name of the thread pool")
310     public String getName()
311     {
312         return _name;
313     }
314
315     /**
316      * Get the priority of the pool threads.
317      *
318      * @return the priority of the pool threads.
319      */
320     @ManagedAttribute("priority of threads in the pool")
321     public int getThreadsPriority()
322     {
323         return _priority;
324     }
325     
326     /**
327      * Get the size of the job queue.
328      * 
329      * @return Number of jobs queued waiting for a thread
330      */
331     @ManagedAttribute("Size of the job queue")
332     public int getQueueSize()
333     {
334         return _jobs.size();
335     }
336
337     /**
338      * Delegated to the named or anonymous Pool.
339      */
340     @ManagedAttribute("thead pool using a daemon thread")
341     public boolean isDaemon()
342     {
343         return _daemon;
344     }
345
346     public boolean isDetailedDump()
347     {
348         return _detailedDump;
349     }
350
351     public void setDetailedDump(boolean detailedDump)
352     {
353         _detailedDump = detailedDump;
354     }
355     
356     @Override
357     public void execute(Runnable job)
358     {
359         if (!isRunning() || !_jobs.offer(job))
360         {
361             LOG.warn("{} rejected {}", this, job);
362             throw new RejectedExecutionException(job.toString());
363         }
364         else
365         {
366             // Make sure there is at least one thread executing the job.
367             if (getThreads() == 0)
368                 startThreads(1);
369         }
370     }
371
372     /**
373      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
374      */
375     @Override
376     public void join() throws InterruptedException
377     {
378         synchronized (_joinLock)
379         {
380             while (isRunning())
381                 _joinLock.wait();
382         }
383
384         while (isStopping())
385             Thread.sleep(1);
386     }
387
388     /**
389      * @return The total number of threads currently in the pool
390      */
391     @Override
392     @ManagedAttribute("total number of threads currently in the pool")
393     public int getThreads()
394     {
395         return _threadsStarted.get();
396     }
397
398     /**
399      * @return The number of idle threads in the pool
400      */
401     @Override
402     @ManagedAttribute("total number of idle threads in the pool")
403     public int getIdleThreads()
404     {
405         return _threadsIdle.get();
406     }
407
408     /**
409      * @return The number of busy threads in the pool
410      */
411     @ManagedAttribute("total number of busy threads in the pool")
412     public int getBusyThreads()
413     {
414         return getThreads() - getIdleThreads();
415     }
416     
417     /**
418      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
419      */
420     @Override
421     @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
422     public boolean isLowOnThreads()
423     {
424         return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
425     }
426
427     private boolean startThreads(int threadsToStart)
428     {
429         while (threadsToStart > 0 && isRunning())
430         {
431             int threads = _threadsStarted.get();
432             if (threads >= _maxThreads)
433                 return false;
434
435             if (!_threadsStarted.compareAndSet(threads, threads + 1))
436                 continue;
437
438             boolean started = false;
439             try
440             {
441                 Thread thread = newThread(_runnable);
442                 thread.setDaemon(isDaemon());
443                 thread.setPriority(getThreadsPriority());
444                 thread.setName(_name + "-" + thread.getId());
445                 _threads.add(thread);
446
447                 thread.start();
448                 started = true;
449                 --threadsToStart;
450             }
451             finally
452             {
453                 if (!started)
454                     _threadsStarted.decrementAndGet();
455             }
456         }
457         return true;
458     }
459
460     protected Thread newThread(Runnable runnable)
461     {
462         return new Thread(runnable);
463     }
464
465     @Override
466     @ManagedOperation("dump thread state")
467     public String dump()
468     {
469         return ContainerLifeCycle.dump(this);
470     }
471
472     @Override
473     public void dump(Appendable out, String indent) throws IOException
474     {
475         List<Object> dump = new ArrayList<>(getMaxThreads());
476         for (final Thread thread : _threads)
477         {
478             final StackTraceElement[] trace = thread.getStackTrace();
479             boolean inIdleJobPoll = false;
480             for (StackTraceElement t : trace)
481             {
482                 if ("idleJobPoll".equals(t.getMethodName()))
483                 {
484                     inIdleJobPoll = true;
485                     break;
486                 }
487             }
488             final boolean idle = inIdleJobPoll;
489
490             if (isDetailedDump())
491             {
492                 dump.add(new Dumpable()
493                 {
494                     @Override
495                     public void dump(Appendable out, String indent) throws IOException
496                     {
497                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "");
498                         if (thread.getPriority()!=Thread.NORM_PRIORITY)
499                             out.append(" prio=").append(String.valueOf(thread.getPriority()));
500                         out.append(System.lineSeparator());
501                         if (!idle)
502                             ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
503                     }
504
505                     @Override
506                     public String dump()
507                     {
508                         return null;
509                     }
510                 });
511             }
512             else
513             {
514                 int p=thread.getPriority();
515                 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
516             }
517         }
518
519         ContainerLifeCycle.dumpObject(out, this);
520         ContainerLifeCycle.dump(out, indent, dump);
521     }
522
523     @Override
524     public String toString()
525     {
526         return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
527     }
528
529     private Runnable idleJobPoll() throws InterruptedException
530     {
531         return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
532     }
533
534     private Runnable _runnable = new Runnable()
535     {
536         @Override
537         public void run()
538         {
539             boolean shrink = false;
540             boolean ignore = false;
541             try
542             {
543                 Runnable job = _jobs.poll();
544
545                 if (job != null && _threadsIdle.get() == 0)
546                 {
547                     startThreads(1);
548                 }
549
550                 loop: while (isRunning())
551                 {
552                     // Job loop
553                     while (job != null && isRunning())
554                     {
555                         runJob(job);
556                         if (Thread.interrupted())
557                         {
558                             ignore=true;
559                             break loop;
560                         }
561                         job = _jobs.poll();
562                     }
563
564                     // Idle loop
565                     try
566                     {
567                         _threadsIdle.incrementAndGet();
568
569                         while (isRunning() && job == null)
570                         {
571                             if (_idleTimeout <= 0)
572                                 job = _jobs.take();
573                             else
574                             {
575                                 // maybe we should shrink?
576                                 final int size = _threadsStarted.get();
577                                 if (size > _minThreads)
578                                 {
579                                     long last = _lastShrink.get();
580                                     long now = System.nanoTime();
581                                     if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
582                                     {
583                                         if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1))
584                                         {
585                                             shrink=true;
586                                             break loop;
587                                         }
588                                     }
589                                 }
590                                 job = idleJobPoll();
591                             }
592                         }
593                     }
594                     finally
595                     {
596                         if (_threadsIdle.decrementAndGet() == 0)
597                         {
598                             startThreads(1);
599                         }
600                     }
601                 }
602             }
603             catch (InterruptedException e)
604             {
605                 ignore=true;
606                 LOG.ignore(e);
607             }
608             catch (Throwable e)
609             {
610                 LOG.warn(e);
611             }
612             finally
613             {
614                 if (!shrink && isRunning())
615                 {
616                     if (!ignore)
617                         LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this);
618                     // This is an unexpected thread death!
619                     if (_threadsStarted.decrementAndGet()<getMaxThreads())
620                         startThreads(1);
621                 }
622                 _threads.remove(Thread.currentThread());
623             }
624         }
625     };
626
627     /**
628      * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
629      * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
630      *
631      * @param job the job to run
632      */
633     protected void runJob(Runnable job)
634     {
635         job.run();
636     }
637
638     /**
639      * @return the job queue
640      */
641     protected BlockingQueue<Runnable> getQueue()
642     {
643         return _jobs;
644     }
645
646     /**
647      * @param queue the job queue
648      */
649     public void setQueue(BlockingQueue<Runnable> queue)
650     {
651         throw new UnsupportedOperationException("Use constructor injection");
652     }
653
654     /**
655      * @param id The thread ID to interrupt.
656      * @return true if the thread was found and interrupted.
657      */
658     @ManagedOperation("interrupt a pool thread")
659     public boolean interruptThread(@Name("id") long id)
660     {
661         for (Thread thread : _threads)
662         {
663             if (thread.getId() == id)
664             {
665                 thread.interrupt();
666                 return true;
667             }
668         }
669         return false;
670     }
671
672     /**
673      * @param id The thread ID to interrupt.
674      * @return true if the thread was found and interrupted.
675      */
676     @ManagedOperation("dump a pool thread stack")
677     public String dumpThread(@Name("id") long id)
678     {
679         for (Thread thread : _threads)
680         {
681             if (thread.getId() == id)
682             {
683                 StringBuilder buf = new StringBuilder();
684                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
685                 buf.append(thread.getState()).append(":").append(System.lineSeparator());
686                 for (StackTraceElement element : thread.getStackTrace())
687                     buf.append("  at ").append(element.toString()).append(System.lineSeparator());
688                 return buf.toString();
689             }
690         }
691         return null;
692     }
693 }