]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/util/thread/QueuedThreadPool.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / util / thread / QueuedThreadPool.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19
20 package org.eclipse.jetty.util.thread;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.eclipse.jetty.util.BlockingArrayQueue;
34 import org.eclipse.jetty.util.StringUtil;
35 import org.eclipse.jetty.util.annotation.ManagedAttribute;
36 import org.eclipse.jetty.util.annotation.ManagedObject;
37 import org.eclipse.jetty.util.annotation.ManagedOperation;
38 import org.eclipse.jetty.util.annotation.Name;
39 import org.eclipse.jetty.util.component.AbstractLifeCycle;
40 import org.eclipse.jetty.util.component.ContainerLifeCycle;
41 import org.eclipse.jetty.util.component.Dumpable;
42 import org.eclipse.jetty.util.component.LifeCycle;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
46
47 @ManagedObject("A thread pool with no max bound by default")
48 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
49 {
50     private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
51
52     private final AtomicInteger _threadsStarted = new AtomicInteger();
53     private final AtomicInteger _threadsIdle = new AtomicInteger();
54     private final AtomicLong _lastShrink = new AtomicLong();
55     private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
56     private final Object _joinLock = new Object();
57     private final BlockingQueue<Runnable> _jobs;
58     private String _name = "qtp" + hashCode();
59     private int _idleTimeout;
60     private int _maxThreads;
61     private int _minThreads;
62     private int _priority = Thread.NORM_PRIORITY;
63     private boolean _daemon = false;
64     private boolean _detailedDump = false;
65
66     public QueuedThreadPool()
67     {
68         this(200);
69     }
70
71     public QueuedThreadPool(@Name("maxThreads") int maxThreads)
72     {
73         this(maxThreads, 8);
74     }
75
76     public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads)
77     {
78         this(maxThreads, minThreads, 60000);
79     }
80
81     public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
82     {
83         this(maxThreads, minThreads, idleTimeout, null);
84     }
85
86     public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
87     {
88         setMinThreads(minThreads);
89         setMaxThreads(maxThreads);
90         setIdleTimeout(idleTimeout);
91         setStopTimeout(5000);
92
93         if (queue==null)
94             queue=new BlockingArrayQueue<>(_minThreads, _minThreads);
95         _jobs=queue;
96
97     }
98
99     @Override
100     protected void doStart() throws Exception
101     {
102         super.doStart();
103         _threadsStarted.set(0);
104
105         startThreads(_minThreads);
106     }
107
108     @Override
109     protected void doStop() throws Exception
110     {
111         super.doStop();
112
113         long timeout = getStopTimeout();
114         BlockingQueue<Runnable> jobs = getQueue();
115
116         // If no stop timeout, clear job queue
117         if (timeout <= 0)
118             jobs.clear();
119
120         // Fill job Q with noop jobs to wakeup idle
121         Runnable noop = new Runnable()
122         {
123             @Override
124             public void run()
125             {
126             }
127         };
128         for (int i = _threadsStarted.get(); i-- > 0; )
129             jobs.offer(noop);
130
131         // try to jobs complete naturally for half our stop time
132         long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
133         for (Thread thread : _threads)
134         {
135             long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
136             if (canwait > 0)
137                 thread.join(canwait);
138         }
139
140         // If we still have threads running, get a bit more aggressive
141
142         // interrupt remaining threads
143         if (_threadsStarted.get() > 0)
144             for (Thread thread : _threads)
145                 thread.interrupt();
146
147         // wait again for the other half of our stop time
148         stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
149         for (Thread thread : _threads)
150         {
151             long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
152             if (canwait > 0)
153                 thread.join(canwait);
154         }
155
156         Thread.yield();
157         int size = _threads.size();
158         if (size > 0)
159         {
160             Thread.yield();
161             
162             if (LOG.isDebugEnabled())
163             {
164                 for (Thread unstopped : _threads)
165                 {
166                     StringBuilder dmp = new StringBuilder();
167                     for (StackTraceElement element : unstopped.getStackTrace())
168                     {
169                         dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
170                     }
171                     LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
172                 }
173             }
174             else
175             {
176                 for (Thread unstopped : _threads)
177                     LOG.warn("{} Couldn't stop {}",this,unstopped);
178             }
179         }
180
181         synchronized (_joinLock)
182         {
183             _joinLock.notifyAll();
184         }
185     }
186
187     /**
188      * Delegated to the named or anonymous Pool.
189      */
190     public void setDaemon(boolean daemon)
191     {
192         _daemon = daemon;
193     }
194
195     /**
196      * Set the maximum thread idle time.
197      * Threads that are idle for longer than this period may be
198      * stopped.
199      * Delegated to the named or anonymous Pool.
200      *
201      * @param idleTimeout Max idle time in ms.
202      * @see #getIdleTimeout
203      */
204     public void setIdleTimeout(int idleTimeout)
205     {
206         _idleTimeout = idleTimeout;
207     }
208
209     /**
210      * Set the maximum number of threads.
211      * Delegated to the named or anonymous Pool.
212      *
213      * @param maxThreads maximum number of threads.
214      * @see #getMaxThreads
215      */
216     @Override
217     public void setMaxThreads(int maxThreads)
218     {
219         _maxThreads = maxThreads;
220         if (_minThreads > _maxThreads)
221             _minThreads = _maxThreads;
222     }
223
224     /**
225      * Set the minimum number of threads.
226      * Delegated to the named or anonymous Pool.
227      *
228      * @param minThreads minimum number of threads
229      * @see #getMinThreads
230      */
231     @Override
232     public void setMinThreads(int minThreads)
233     {
234         _minThreads = minThreads;
235
236         if (_minThreads > _maxThreads)
237             _maxThreads = _minThreads;
238
239         int threads = _threadsStarted.get();
240         if (isStarted() && threads < _minThreads)
241             startThreads(_minThreads - threads);
242     }
243
244     /**
245      * @param name Name of this thread pool to use when naming threads.
246      */
247     public void setName(String name)
248     {
249         if (isRunning())
250             throw new IllegalStateException("started");
251         _name = name;
252     }
253
254     /**
255      * Set the priority of the pool threads.
256      *
257      * @param priority the new thread priority.
258      */
259     public void setThreadsPriority(int priority)
260     {
261         _priority = priority;
262     }
263
264     /**
265      * Get the maximum thread idle time.
266      * Delegated to the named or anonymous Pool.
267      *
268      * @return Max idle time in ms.
269      * @see #setIdleTimeout
270      */
271     @ManagedAttribute("maximum time a thread may be idle in ms")
272     public int getIdleTimeout()
273     {
274         return _idleTimeout;
275     }
276
277     /**
278      * Set the maximum number of threads.
279      * Delegated to the named or anonymous Pool.
280      *
281      * @return maximum number of threads.
282      * @see #setMaxThreads
283      */
284     @Override
285     @ManagedAttribute("maximum number of threads in the pool")
286     public int getMaxThreads()
287     {
288         return _maxThreads;
289     }
290
291     /**
292      * Get the minimum number of threads.
293      * Delegated to the named or anonymous Pool.
294      *
295      * @return minimum number of threads.
296      * @see #setMinThreads
297      */
298     @Override
299     @ManagedAttribute("minimum number of threads in the pool")
300     public int getMinThreads()
301     {
302         return _minThreads;
303     }
304
305     /**
306      * @return The name of the this thread pool
307      */
308     @ManagedAttribute("name of the thread pool")
309     public String getName()
310     {
311         return _name;
312     }
313
314     /**
315      * Get the priority of the pool threads.
316      *
317      * @return the priority of the pool threads.
318      */
319     @ManagedAttribute("priority of threads in the pool")
320     public int getThreadsPriority()
321     {
322         return _priority;
323     }
324     
325     /**
326      * Get the size of the job queue.
327      * 
328      * @return Number of jobs queued waiting for a thread
329      */
330     @ManagedAttribute("Size of the job queue")
331     public int getQueueSize()
332     {
333         return _jobs.size();
334     }
335
336     /**
337      * Delegated to the named or anonymous Pool.
338      */
339     @ManagedAttribute("thead pool using a daemon thread")
340     public boolean isDaemon()
341     {
342         return _daemon;
343     }
344
345     public boolean isDetailedDump()
346     {
347         return _detailedDump;
348     }
349
350     public void setDetailedDump(boolean detailedDump)
351     {
352         _detailedDump = detailedDump;
353     }
354     
355     @Override
356     public void execute(Runnable job)
357     {
358         if (!isRunning() || !_jobs.offer(job))
359         {
360             LOG.warn("{} rejected {}", this, job);
361             throw new RejectedExecutionException(job.toString());
362         }
363     }
364
365     /**
366      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
367      */
368     @Override
369     public void join() throws InterruptedException
370     {
371         synchronized (_joinLock)
372         {
373             while (isRunning())
374                 _joinLock.wait();
375         }
376
377         while (isStopping())
378             Thread.sleep(1);
379     }
380
381     /**
382      * @return The total number of threads currently in the pool
383      */
384     @Override
385     @ManagedAttribute("total number of threads currently in the pool")
386     public int getThreads()
387     {
388         return _threadsStarted.get();
389     }
390
391     /**
392      * @return The number of idle threads in the pool
393      */
394     @Override
395     @ManagedAttribute("total number of idle threads in the pool")
396     public int getIdleThreads()
397     {
398         return _threadsIdle.get();
399     }
400
401     /**
402      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
403      */
404     @Override
405     @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
406     public boolean isLowOnThreads()
407     {
408         return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
409     }
410
411     private boolean startThreads(int threadsToStart)
412     {
413         while (threadsToStart > 0)
414         {
415             int threads = _threadsStarted.get();
416             if (threads >= _maxThreads)
417                 return false;
418
419             if (!_threadsStarted.compareAndSet(threads, threads + 1))
420                 continue;
421
422             boolean started = false;
423             try
424             {
425                 Thread thread = newThread(_runnable);
426                 thread.setDaemon(isDaemon());
427                 thread.setPriority(getThreadsPriority());
428                 thread.setName(_name + "-" + thread.getId());
429                 _threads.add(thread);
430
431                 thread.start();
432                 started = true;
433             }
434             finally
435             {
436                 if (!started)
437                     _threadsStarted.decrementAndGet();
438             }
439             if (started)
440                 threadsToStart--;
441         }
442         return true;
443     }
444
445     protected Thread newThread(Runnable runnable)
446     {
447         return new Thread(runnable);
448     }
449
450
451     @Override
452     @ManagedOperation("dump thread state")
453     public String dump()
454     {
455         return ContainerLifeCycle.dump(this);
456     }
457
458     @Override
459     public void dump(Appendable out, String indent) throws IOException
460     {
461         List<Object> dump = new ArrayList<>(getMaxThreads());
462         for (final Thread thread : _threads)
463         {
464             final StackTraceElement[] trace = thread.getStackTrace();
465             boolean inIdleJobPoll = false;
466             for (StackTraceElement t : trace)
467             {
468                 if ("idleJobPoll".equals(t.getMethodName()))
469                 {
470                     inIdleJobPoll = true;
471                     break;
472                 }
473             }
474             final boolean idle = inIdleJobPoll;
475
476             if (isDetailedDump())
477             {
478                 dump.add(new Dumpable()
479                 {
480                     @Override
481                     public void dump(Appendable out, String indent) throws IOException
482                     {
483                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n');
484                         if (!idle)
485                             ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
486                     }
487
488                     @Override
489                     public String dump()
490                     {
491                         return null;
492                     }
493                 });
494             }
495             else
496             {
497                 dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : ""));
498             }
499         }
500
501         ContainerLifeCycle.dumpObject(out, this);
502         ContainerLifeCycle.dump(out, indent, dump);
503     }
504
505     @Override
506     public String toString()
507     {
508         return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
509     }
510
511     private Runnable idleJobPoll() throws InterruptedException
512     {
513         return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
514     }
515
516     private Runnable _runnable = new Runnable()
517     {
518         @Override
519         public void run()
520         {
521             boolean shrink = false;
522             try
523             {
524                 Runnable job = _jobs.poll();
525
526                 if (job != null && _threadsIdle.get() == 0)
527                 {
528                     startThreads(1);
529                 }
530
531                 loop: while (isRunning())
532                 {
533                     // Job loop
534                     while (job != null && isRunning())
535                     {
536                         runJob(job);
537                         if (Thread.interrupted())
538                             break loop;
539                         job = _jobs.poll();
540                     }
541
542                     // Idle loop
543                     try
544                     {
545                         _threadsIdle.incrementAndGet();
546
547                         while (isRunning() && job == null)
548                         {
549                             if (_idleTimeout <= 0)
550                                 job = _jobs.take();
551                             else
552                             {
553                                 // maybe we should shrink?
554                                 final int size = _threadsStarted.get();
555                                 if (size > _minThreads)
556                                 {
557                                     long last = _lastShrink.get();
558                                     long now = System.nanoTime();
559                                     if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
560                                     {
561                                         shrink = _lastShrink.compareAndSet(last, now) &&
562                                                 _threadsStarted.compareAndSet(size, size - 1);
563                                         if (shrink)
564                                         {
565                                             return;
566                                         }
567                                     }
568                                 }
569                                 job = idleJobPoll();
570                             }
571                         }
572                     }
573                     finally
574                     {
575                         if (_threadsIdle.decrementAndGet() == 0)
576                         {
577                             startThreads(1);
578                         }
579                     }
580                 }
581             }
582             catch (InterruptedException e)
583             {
584                 LOG.ignore(e);
585             }
586             catch (Throwable e)
587             {
588                 LOG.warn(e);
589             }
590             finally
591             {
592                 if (!shrink)
593                     _threadsStarted.decrementAndGet();
594                 _threads.remove(Thread.currentThread());
595             }
596         }
597     };
598
599     /**
600      * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
601      * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
602      *
603      * @param job the job to run
604      */
605     protected void runJob(Runnable job)
606     {
607         job.run();
608     }
609
610     /**
611      * @return the job queue
612      */
613     protected BlockingQueue<Runnable> getQueue()
614     {
615         return _jobs;
616     }
617
618     /**
619      * @param queue the job queue
620      */
621     public void setQueue(BlockingQueue<Runnable> queue)
622     {
623         throw new UnsupportedOperationException("Use constructor injection");
624     }
625
626     /**
627      * @param id The thread ID to interrupt.
628      * @return true if the thread was found and interrupted.
629      */
630     @ManagedOperation("interrupt a pool thread")
631     public boolean interruptThread(@Name("id") long id)
632     {
633         for (Thread thread : _threads)
634         {
635             if (thread.getId() == id)
636             {
637                 thread.interrupt();
638                 return true;
639             }
640         }
641         return false;
642     }
643
644     /**
645      * @param id The thread ID to interrupt.
646      * @return true if the thread was found and interrupted.
647      */
648     @ManagedOperation("dump a pool thread stack")
649     public String dumpThread(@Name("id") long id)
650     {
651         for (Thread thread : _threads)
652         {
653             if (thread.getId() == id)
654             {
655                 StringBuilder buf = new StringBuilder();
656                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
657                 for (StackTraceElement element : thread.getStackTrace())
658                     buf.append("  at ").append(element.toString()).append('\n');
659                 return buf.toString();
660             }
661         }
662         return null;
663     }
664     
665
666 }