]> WPIA git - gigi.git/blobdiff - lib/jetty/org/eclipse/jetty/util/thread/QueuedThreadPool.java
updating jetty to jetty-9.2.16.v2016040
[gigi.git] / lib / jetty / org / eclipse / jetty / util / thread / QueuedThreadPool.java
index 5cc75121c0b57a837aba35103c8aa23a73eb953f..bb986b6f68a1273a55f46971dbd7cdf1e4d2ccd0 100644 (file)
@@ -1,6 +1,6 @@
 //
 //  ========================================================================
-//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
 //  ------------------------------------------------------------------------
 //  All rights reserved. This program and the accompanying materials
 //  are made available under the terms of the Eclipse Public License v1.0
@@ -24,14 +24,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.eclipse.jetty.util.BlockingArrayQueue;
-import org.eclipse.jetty.util.StringUtil;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.eclipse.jetty.util.annotation.ManagedAttribute;
 import org.eclipse.jetty.util.annotation.ManagedObject;
 import org.eclipse.jetty.util.annotation.ManagedOperation;
@@ -52,7 +51,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
     private final AtomicInteger _threadsStarted = new AtomicInteger();
     private final AtomicInteger _threadsIdle = new AtomicInteger();
     private final AtomicLong _lastShrink = new AtomicLong();
-    private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
+    private final ConcurrentHashSet<Thread> _threads=new ConcurrentHashSet<Thread>();
     private final Object _joinLock = new Object();
     private final BlockingQueue<Runnable> _jobs;
     private String _name = "qtp" + hashCode();
@@ -91,9 +90,11 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
         setStopTimeout(5000);
 
         if (queue==null)
-            queue=new BlockingArrayQueue<>(_minThreads, _minThreads);
+        {
+            int capacity=Math.max(_minThreads, 8);
+            queue=new BlockingArrayQueue<>(capacity, capacity);
+        }
         _jobs=queue;
-
     }
 
     @Override
@@ -166,7 +167,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
                     StringBuilder dmp = new StringBuilder();
                     for (StackTraceElement element : unstopped.getStackTrace())
                     {
-                        dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
+                        dmp.append(System.lineSeparator()).append("\tat ").append(element);
                     }
                     LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
                 }
@@ -360,6 +361,12 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
             LOG.warn("{} rejected {}", this, job);
             throw new RejectedExecutionException(job.toString());
         }
+        else
+        {
+            // Make sure there is at least one thread executing the job.
+            if (getThreads() == 0)
+                startThreads(1);
+        }
     }
 
     /**
@@ -398,6 +405,15 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
         return _threadsIdle.get();
     }
 
+    /**
+     * @return The number of busy threads in the pool
+     */
+    @ManagedAttribute("total number of busy threads in the pool")
+    public int getBusyThreads()
+    {
+        return getThreads() - getIdleThreads();
+    }
+    
     /**
      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
      */
@@ -410,7 +426,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
 
     private boolean startThreads(int threadsToStart)
     {
-        while (threadsToStart > 0)
+        while (threadsToStart > 0 && isRunning())
         {
             int threads = _threadsStarted.get();
             if (threads >= _maxThreads)
@@ -430,14 +446,13 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
 
                 thread.start();
                 started = true;
+                --threadsToStart;
             }
             finally
             {
                 if (!started)
                     _threadsStarted.decrementAndGet();
             }
-            if (started)
-                threadsToStart--;
         }
         return true;
     }
@@ -447,7 +462,6 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
         return new Thread(runnable);
     }
 
-
     @Override
     @ManagedOperation("dump thread state")
     public String dump()
@@ -480,7 +494,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
                     @Override
                     public void dump(Appendable out, String indent) throws IOException
                     {
-                        out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n');
+                        out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "");
+                        if (thread.getPriority()!=Thread.NORM_PRIORITY)
+                            out.append(" prio=").append(String.valueOf(thread.getPriority()));
+                        out.append(System.lineSeparator());
                         if (!idle)
                             ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
                     }
@@ -494,7 +511,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
             }
             else
             {
-                dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : ""));
+                int p=thread.getPriority();
+                dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
             }
         }
 
@@ -519,6 +537,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
         public void run()
         {
             boolean shrink = false;
+            boolean ignore = false;
             try
             {
                 Runnable job = _jobs.poll();
@@ -535,7 +554,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
                     {
                         runJob(job);
                         if (Thread.interrupted())
+                        {
+                            ignore=true;
                             break loop;
+                        }
                         job = _jobs.poll();
                     }
 
@@ -558,11 +580,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
                                     long now = System.nanoTime();
                                     if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
                                     {
-                                        shrink = _lastShrink.compareAndSet(last, now) &&
-                                                _threadsStarted.compareAndSet(size, size - 1);
-                                        if (shrink)
+                                        if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1))
                                         {
-                                            return;
+                                            shrink=true;
+                                            break loop;
                                         }
                                     }
                                 }
@@ -581,6 +602,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
             }
             catch (InterruptedException e)
             {
+                ignore=true;
                 LOG.ignore(e);
             }
             catch (Throwable e)
@@ -589,8 +611,14 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
             }
             finally
             {
-                if (!shrink)
-                    _threadsStarted.decrementAndGet();
+                if (!shrink && isRunning())
+                {
+                    if (!ignore)
+                        LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this);
+                    // This is an unexpected thread death!
+                    if (_threadsStarted.decrementAndGet()<getMaxThreads())
+                        startThreads(1);
+                }
                 _threads.remove(Thread.currentThread());
             }
         }
@@ -653,14 +681,13 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
             if (thread.getId() == id)
             {
                 StringBuilder buf = new StringBuilder();
-                buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
+                buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
+                buf.append(thread.getState()).append(":").append(System.lineSeparator());
                 for (StackTraceElement element : thread.getStackTrace())
-                    buf.append("  at ").append(element.toString()).append('\n');
+                    buf.append("  at ").append(element.toString()).append(System.lineSeparator());
                 return buf.toString();
             }
         }
         return null;
     }
-    
-
 }