//
// ========================================================================
-// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.BlockingArrayQueue;
-import org.eclipse.jetty.util.StringUtil;
+import org.eclipse.jetty.util.ConcurrentHashSet;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
private final AtomicInteger _threadsStarted = new AtomicInteger();
private final AtomicInteger _threadsIdle = new AtomicInteger();
private final AtomicLong _lastShrink = new AtomicLong();
- private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
+ private final ConcurrentHashSet<Thread> _threads=new ConcurrentHashSet<Thread>();
private final Object _joinLock = new Object();
private final BlockingQueue<Runnable> _jobs;
private String _name = "qtp" + hashCode();
setStopTimeout(5000);
if (queue==null)
- queue=new BlockingArrayQueue<>(_minThreads, _minThreads);
+ {
+ int capacity=Math.max(_minThreads, 8);
+ queue=new BlockingArrayQueue<>(capacity, capacity);
+ }
_jobs=queue;
-
}
@Override
StringBuilder dmp = new StringBuilder();
for (StackTraceElement element : unstopped.getStackTrace())
{
- dmp.append(StringUtil.__LINE_SEPARATOR).append("\tat ").append(element);
+ dmp.append(System.lineSeparator()).append("\tat ").append(element);
}
LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
}
LOG.warn("{} rejected {}", this, job);
throw new RejectedExecutionException(job.toString());
}
+ else
+ {
+ // Make sure there is at least one thread executing the job.
+ if (getThreads() == 0)
+ startThreads(1);
+ }
}
/**
return _threadsIdle.get();
}
+ /**
+ * @return The number of busy threads in the pool
+ */
+ @ManagedAttribute("total number of busy threads in the pool")
+ public int getBusyThreads()
+ {
+ return getThreads() - getIdleThreads();
+ }
+
/**
* @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
*/
private boolean startThreads(int threadsToStart)
{
- while (threadsToStart > 0)
+ while (threadsToStart > 0 && isRunning())
{
int threads = _threadsStarted.get();
if (threads >= _maxThreads)
thread.start();
started = true;
+ --threadsToStart;
}
finally
{
if (!started)
_threadsStarted.decrementAndGet();
}
- if (started)
- threadsToStart--;
}
return true;
}
return new Thread(runnable);
}
-
@Override
@ManagedOperation("dump thread state")
public String dump()
@Override
public void dump(Appendable out, String indent) throws IOException
{
- out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n');
+ out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "");
+ if (thread.getPriority()!=Thread.NORM_PRIORITY)
+ out.append(" prio=").append(String.valueOf(thread.getPriority()));
+ out.append(System.lineSeparator());
if (!idle)
ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
}
}
else
{
- dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : ""));
+ int p=thread.getPriority();
+ dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
}
}
public void run()
{
boolean shrink = false;
+ boolean ignore = false;
try
{
Runnable job = _jobs.poll();
{
runJob(job);
if (Thread.interrupted())
+ {
+ ignore=true;
break loop;
+ }
job = _jobs.poll();
}
long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
{
- shrink = _lastShrink.compareAndSet(last, now) &&
- _threadsStarted.compareAndSet(size, size - 1);
- if (shrink)
+ if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1))
{
- return;
+ shrink=true;
+ break loop;
}
}
}
}
catch (InterruptedException e)
{
+ ignore=true;
LOG.ignore(e);
}
catch (Throwable e)
}
finally
{
- if (!shrink)
- _threadsStarted.decrementAndGet();
+ if (!shrink && isRunning())
+ {
+ if (!ignore)
+ LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this);
+ // This is an unexpected thread death!
+ if (_threadsStarted.decrementAndGet()<getMaxThreads())
+ startThreads(1);
+ }
_threads.remove(Thread.currentThread());
}
}
if (thread.getId() == id)
{
StringBuilder buf = new StringBuilder();
- buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
+ buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
+ buf.append(thread.getState()).append(":").append(System.lineSeparator());
for (StackTraceElement element : thread.getStackTrace())
- buf.append(" at ").append(element.toString()).append('\n');
+ buf.append(" at ").append(element.toString()).append(System.lineSeparator());
return buf.toString();
}
}
return null;
}
-
-
}