]> WPIA git - gigi.git/blobdiff - lib/jetty/org/eclipse/jetty/util/thread/QueuedThreadPool.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / util / thread / QueuedThreadPool.java
diff --git a/lib/jetty/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/lib/jetty/org/eclipse/jetty/util/thread/QueuedThreadPool.java
new file mode 100644 (file)
index 0000000..5cc7512
--- /dev/null
@@ -0,0 +1,666 @@
+//
+//  ========================================================================
+//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  ------------------------------------------------------------------------
+//  All rights reserved. This program and the accompanying materials
+//  are made available under the terms of the Eclipse Public License v1.0
+//  and Apache License v2.0 which accompanies this distribution.
+//
+//      The Eclipse Public License is available at
+//      http://www.eclipse.org/legal/epl-v10.html
+//
+//      The Apache License v2.0 is available at
+//      http://www.opensource.org/licenses/apache2.0.php
+//
+//  You may elect to redistribute this code under either of these licenses.
+//  ========================================================================
+//
+
+
+package org.eclipse.jetty.util.thread;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.eclipse.jetty.util.StringUtil;
+import org.eclipse.jetty.util.annotation.ManagedAttribute;
+import org.eclipse.jetty.util.annotation.ManagedObject;
+import org.eclipse.jetty.util.annotation.ManagedOperation;
+import org.eclipse.jetty.util.annotation.Name;
+import org.eclipse.jetty.util.component.AbstractLifeCycle;
+import org.eclipse.jetty.util.component.ContainerLifeCycle;
+import org.eclipse.jetty.util.component.Dumpable;
+import org.eclipse.jetty.util.component.LifeCycle;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
+
+@ManagedObject("A thread pool with no max bound by default")
+public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
+{
+    private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
+
+    private final AtomicInteger _threadsStarted = new AtomicInteger();
+    private final AtomicInteger _threadsIdle = new AtomicInteger();
+    private final AtomicLong _lastShrink = new AtomicLong();
+    private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
+    private final Object _joinLock = new Object();
+    private final BlockingQueue<Runnable> _jobs;
+    private String _name = "qtp" + hashCode();
+    private int _idleTimeout;
+    private int _maxThreads;
+    private int _minThreads;
+    private int _priority = Thread.NORM_PRIORITY;
+    private boolean _daemon = false;
+    private boolean _detailedDump = false;
+
+    public QueuedThreadPool()
+    {
+        this(200);
+    }
+
+    public QueuedThreadPool(@Name("maxThreads") int maxThreads)
+    {
+        this(maxThreads, 8);
+    }
+
+    public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads)
+    {
+        this(maxThreads, minThreads, 60000);
+    }
+
+    public QueuedThreadPool(@Name("maxThreads") int maxThreads,  @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout)
+    {
+        this(maxThreads, minThreads, idleTimeout, null);
+    }
+
+    public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
+    {
+        setMinThreads(minThreads);
+        setMaxThreads(maxThreads);
+        setIdleTimeout(idleTimeout);
+        setStopTimeout(5000);
+
+        if (queue==null)
+            queue=new BlockingArrayQueue<>(_minThreads, _minThreads);
+        _jobs=queue;
+
+    }
+
+    @Override
+    protected void doStart() throws Exception
+    {
+        super.doStart();
+        _threadsStarted.set(0);
+
+        startThreads(_minThreads);
+    }
+
+    @Override
+    protected void doStop() throws Exception
+    {
+        super.doStop();
+
+        long timeout = getStopTimeout();
+        BlockingQueue<Runnable> jobs = getQueue();
+
+        // If no stop timeout, clear job queue
+        if (timeout <= 0)
+            jobs.clear();
+
+        // Fill job Q with noop jobs to wakeup idle
+        Runnable noop = new Runnable()
+        {
+            @Override
+            public void run()
+            {
+            }
+        };
+        for (int i = _threadsStarted.get(); i-- > 0; )
+            jobs.offer(noop);
+
+        // try to jobs complete naturally for half our stop time
+        long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
+        for (Thread thread : _threads)
+        {
+            long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
+            if (canwait > 0)
+                thread.join(canwait);
+        }
+
+        // If we still have threads running, get a bit more aggressive
+
+        // interrupt remaining threads
+        if (_threadsStarted.get() > 0)
+            for (Thread thread : _threads)
+                thread.interrupt();
+
+        // wait again for the other half of our stop time
+        stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
+        for (Thread thread : _threads)
+        {
+            long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
+            if (canwait > 0)
+                thread.join(canwait);
+        }
+
+        Thread.yield();
+        int size = _threads.size();
+        if (size > 0)
+        {
+            Thread.yield();
+            
+            if (LOG.isDebugEnabled())
+            {
+                for (Thread unstopped : _threads)
+                {
+                    StringBuilder dmp = new StringBuilder();
+                    for (StackTraceElement element : unstopped.getStackTrace())
+                    {
+                        dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
+                    }
+                    LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
+                }
+            }
+            else
+            {
+                for (Thread unstopped : _threads)
+                    LOG.warn("{} Couldn't stop {}",this,unstopped);
+            }
+        }
+
+        synchronized (_joinLock)
+        {
+            _joinLock.notifyAll();
+        }
+    }
+
+    /**
+     * Delegated to the named or anonymous Pool.
+     */
+    public void setDaemon(boolean daemon)
+    {
+        _daemon = daemon;
+    }
+
+    /**
+     * Set the maximum thread idle time.
+     * Threads that are idle for longer than this period may be
+     * stopped.
+     * Delegated to the named or anonymous Pool.
+     *
+     * @param idleTimeout Max idle time in ms.
+     * @see #getIdleTimeout
+     */
+    public void setIdleTimeout(int idleTimeout)
+    {
+        _idleTimeout = idleTimeout;
+    }
+
+    /**
+     * Set the maximum number of threads.
+     * Delegated to the named or anonymous Pool.
+     *
+     * @param maxThreads maximum number of threads.
+     * @see #getMaxThreads
+     */
+    @Override
+    public void setMaxThreads(int maxThreads)
+    {
+        _maxThreads = maxThreads;
+        if (_minThreads > _maxThreads)
+            _minThreads = _maxThreads;
+    }
+
+    /**
+     * Set the minimum number of threads.
+     * Delegated to the named or anonymous Pool.
+     *
+     * @param minThreads minimum number of threads
+     * @see #getMinThreads
+     */
+    @Override
+    public void setMinThreads(int minThreads)
+    {
+        _minThreads = minThreads;
+
+        if (_minThreads > _maxThreads)
+            _maxThreads = _minThreads;
+
+        int threads = _threadsStarted.get();
+        if (isStarted() && threads < _minThreads)
+            startThreads(_minThreads - threads);
+    }
+
+    /**
+     * @param name Name of this thread pool to use when naming threads.
+     */
+    public void setName(String name)
+    {
+        if (isRunning())
+            throw new IllegalStateException("started");
+        _name = name;
+    }
+
+    /**
+     * Set the priority of the pool threads.
+     *
+     * @param priority the new thread priority.
+     */
+    public void setThreadsPriority(int priority)
+    {
+        _priority = priority;
+    }
+
+    /**
+     * Get the maximum thread idle time.
+     * Delegated to the named or anonymous Pool.
+     *
+     * @return Max idle time in ms.
+     * @see #setIdleTimeout
+     */
+    @ManagedAttribute("maximum time a thread may be idle in ms")
+    public int getIdleTimeout()
+    {
+        return _idleTimeout;
+    }
+
+    /**
+     * Set the maximum number of threads.
+     * Delegated to the named or anonymous Pool.
+     *
+     * @return maximum number of threads.
+     * @see #setMaxThreads
+     */
+    @Override
+    @ManagedAttribute("maximum number of threads in the pool")
+    public int getMaxThreads()
+    {
+        return _maxThreads;
+    }
+
+    /**
+     * Get the minimum number of threads.
+     * Delegated to the named or anonymous Pool.
+     *
+     * @return minimum number of threads.
+     * @see #setMinThreads
+     */
+    @Override
+    @ManagedAttribute("minimum number of threads in the pool")
+    public int getMinThreads()
+    {
+        return _minThreads;
+    }
+
+    /**
+     * @return The name of the this thread pool
+     */
+    @ManagedAttribute("name of the thread pool")
+    public String getName()
+    {
+        return _name;
+    }
+
+    /**
+     * Get the priority of the pool threads.
+     *
+     * @return the priority of the pool threads.
+     */
+    @ManagedAttribute("priority of threads in the pool")
+    public int getThreadsPriority()
+    {
+        return _priority;
+    }
+    
+    /**
+     * Get the size of the job queue.
+     * 
+     * @return Number of jobs queued waiting for a thread
+     */
+    @ManagedAttribute("Size of the job queue")
+    public int getQueueSize()
+    {
+        return _jobs.size();
+    }
+
+    /**
+     * Delegated to the named or anonymous Pool.
+     */
+    @ManagedAttribute("thead pool using a daemon thread")
+    public boolean isDaemon()
+    {
+        return _daemon;
+    }
+
+    public boolean isDetailedDump()
+    {
+        return _detailedDump;
+    }
+
+    public void setDetailedDump(boolean detailedDump)
+    {
+        _detailedDump = detailedDump;
+    }
+    
+    @Override
+    public void execute(Runnable job)
+    {
+        if (!isRunning() || !_jobs.offer(job))
+        {
+            LOG.warn("{} rejected {}", this, job);
+            throw new RejectedExecutionException(job.toString());
+        }
+    }
+
+    /**
+     * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
+     */
+    @Override
+    public void join() throws InterruptedException
+    {
+        synchronized (_joinLock)
+        {
+            while (isRunning())
+                _joinLock.wait();
+        }
+
+        while (isStopping())
+            Thread.sleep(1);
+    }
+
+    /**
+     * @return The total number of threads currently in the pool
+     */
+    @Override
+    @ManagedAttribute("total number of threads currently in the pool")
+    public int getThreads()
+    {
+        return _threadsStarted.get();
+    }
+
+    /**
+     * @return The number of idle threads in the pool
+     */
+    @Override
+    @ManagedAttribute("total number of idle threads in the pool")
+    public int getIdleThreads()
+    {
+        return _threadsIdle.get();
+    }
+
+    /**
+     * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
+     */
+    @Override
+    @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
+    public boolean isLowOnThreads()
+    {
+        return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
+    }
+
+    private boolean startThreads(int threadsToStart)
+    {
+        while (threadsToStart > 0)
+        {
+            int threads = _threadsStarted.get();
+            if (threads >= _maxThreads)
+                return false;
+
+            if (!_threadsStarted.compareAndSet(threads, threads + 1))
+                continue;
+
+            boolean started = false;
+            try
+            {
+                Thread thread = newThread(_runnable);
+                thread.setDaemon(isDaemon());
+                thread.setPriority(getThreadsPriority());
+                thread.setName(_name + "-" + thread.getId());
+                _threads.add(thread);
+
+                thread.start();
+                started = true;
+            }
+            finally
+            {
+                if (!started)
+                    _threadsStarted.decrementAndGet();
+            }
+            if (started)
+                threadsToStart--;
+        }
+        return true;
+    }
+
+    protected Thread newThread(Runnable runnable)
+    {
+        return new Thread(runnable);
+    }
+
+
+    @Override
+    @ManagedOperation("dump thread state")
+    public String dump()
+    {
+        return ContainerLifeCycle.dump(this);
+    }
+
+    @Override
+    public void dump(Appendable out, String indent) throws IOException
+    {
+        List<Object> dump = new ArrayList<>(getMaxThreads());
+        for (final Thread thread : _threads)
+        {
+            final StackTraceElement[] trace = thread.getStackTrace();
+            boolean inIdleJobPoll = false;
+            for (StackTraceElement t : trace)
+            {
+                if ("idleJobPoll".equals(t.getMethodName()))
+                {
+                    inIdleJobPoll = true;
+                    break;
+                }
+            }
+            final boolean idle = inIdleJobPoll;
+
+            if (isDetailedDump())
+            {
+                dump.add(new Dumpable()
+                {
+                    @Override
+                    public void dump(Appendable out, String indent) throws IOException
+                    {
+                        out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n');
+                        if (!idle)
+                            ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
+                    }
+
+                    @Override
+                    public String dump()
+                    {
+                        return null;
+                    }
+                });
+            }
+            else
+            {
+                dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : ""));
+            }
+        }
+
+        ContainerLifeCycle.dumpObject(out, this);
+        ContainerLifeCycle.dump(out, indent, dump);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
+    }
+
+    private Runnable idleJobPoll() throws InterruptedException
+    {
+        return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
+    }
+
+    private Runnable _runnable = new Runnable()
+    {
+        @Override
+        public void run()
+        {
+            boolean shrink = false;
+            try
+            {
+                Runnable job = _jobs.poll();
+
+                if (job != null && _threadsIdle.get() == 0)
+                {
+                    startThreads(1);
+                }
+
+                loop: while (isRunning())
+                {
+                    // Job loop
+                    while (job != null && isRunning())
+                    {
+                        runJob(job);
+                        if (Thread.interrupted())
+                            break loop;
+                        job = _jobs.poll();
+                    }
+
+                    // Idle loop
+                    try
+                    {
+                        _threadsIdle.incrementAndGet();
+
+                        while (isRunning() && job == null)
+                        {
+                            if (_idleTimeout <= 0)
+                                job = _jobs.take();
+                            else
+                            {
+                                // maybe we should shrink?
+                                final int size = _threadsStarted.get();
+                                if (size > _minThreads)
+                                {
+                                    long last = _lastShrink.get();
+                                    long now = System.nanoTime();
+                                    if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
+                                    {
+                                        shrink = _lastShrink.compareAndSet(last, now) &&
+                                                _threadsStarted.compareAndSet(size, size - 1);
+                                        if (shrink)
+                                        {
+                                            return;
+                                        }
+                                    }
+                                }
+                                job = idleJobPoll();
+                            }
+                        }
+                    }
+                    finally
+                    {
+                        if (_threadsIdle.decrementAndGet() == 0)
+                        {
+                            startThreads(1);
+                        }
+                    }
+                }
+            }
+            catch (InterruptedException e)
+            {
+                LOG.ignore(e);
+            }
+            catch (Throwable e)
+            {
+                LOG.warn(e);
+            }
+            finally
+            {
+                if (!shrink)
+                    _threadsStarted.decrementAndGet();
+                _threads.remove(Thread.currentThread());
+            }
+        }
+    };
+
+    /**
+     * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
+     * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
+     *
+     * @param job the job to run
+     */
+    protected void runJob(Runnable job)
+    {
+        job.run();
+    }
+
+    /**
+     * @return the job queue
+     */
+    protected BlockingQueue<Runnable> getQueue()
+    {
+        return _jobs;
+    }
+
+    /**
+     * @param queue the job queue
+     */
+    public void setQueue(BlockingQueue<Runnable> queue)
+    {
+        throw new UnsupportedOperationException("Use constructor injection");
+    }
+
+    /**
+     * @param id The thread ID to interrupt.
+     * @return true if the thread was found and interrupted.
+     */
+    @ManagedOperation("interrupt a pool thread")
+    public boolean interruptThread(@Name("id") long id)
+    {
+        for (Thread thread : _threads)
+        {
+            if (thread.getId() == id)
+            {
+                thread.interrupt();
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @param id The thread ID to interrupt.
+     * @return true if the thread was found and interrupted.
+     */
+    @ManagedOperation("dump a pool thread stack")
+    public String dumpThread(@Name("id") long id)
+    {
+        for (Thread thread : _threads)
+        {
+            if (thread.getId() == id)
+            {
+                StringBuilder buf = new StringBuilder();
+                buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
+                for (StackTraceElement element : thread.getStackTrace())
+                    buf.append("  at ").append(element.toString()).append('\n');
+                return buf.toString();
+            }
+        }
+        return null;
+    }
+    
+
+}