//
// ========================================================================
-// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.TypeUtil;
+import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates";
public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
protected static final Logger LOG = Log.getLogger(SelectorManager.class);
- private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false"));
-
+ private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "true"));
+
private final Executor executor;
private final Scheduler scheduler;
private final ManagedSelector[] _selectors;
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private long _selectorIndex;
-
+ private int _priorityDelta;
+
protected SelectorManager(Executor executor, Scheduler scheduler)
{
this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
_connectTimeout = milliseconds;
}
+
+ @ManagedAttribute("The priority delta to apply to selector threads")
+ public int getSelectorPriorityDelta()
+ {
+ return _priorityDelta;
+ }
+
+ /**
+ * Sets the selector thread priority delta to the given amount.
+ * <p>This allows the selector threads to run at a different priority.
+ * Typically this would be used to lower the priority to give preference
+ * to handling previously accepted connections rather than accepting
+ * new connections.</p>
+ *
+ * @param selectorPriorityDelta the amount to change the thread priority
+ * delta to (may be negative)
+ * @see Thread#getPriority()
+ */
+ public void setSelectorPriorityDelta(int selectorPriorityDelta)
+ {
+ int oldDelta = _priorityDelta;
+ _priorityDelta = selectorPriorityDelta;
+ if (oldDelta != selectorPriorityDelta && isStarted())
+ {
+ for (ManagedSelector selector : _selectors)
+ {
+ Thread thread = selector._thread;
+ if (thread != null)
+ {
+ int deltaDiff = selectorPriorityDelta - oldDelta;
+ thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, thread.getPriority() - deltaDiff)));
+ }
+ }
+ }
+ }
+
/**
* Executes the given task in a different thread.
*
/**
* <p>Registers a channel to perform a non-blocking connect.</p>
- * <p>The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)}
- * must be called prior to calling this method.</p>
+ * <p>The channel must be set in non-blocking mode, {@link SocketChannel#connect(SocketAddress)}
+ * must be called prior to calling this method, and the connect operation must not be completed
+ * (the return value of {@link SocketChannel#connect(SocketAddress)} must be false).</p>
*
* @param channel the channel to register
* @param attachment the attachment object
+ * @see #accept(SocketChannel, Object)
*/
public void connect(SocketChannel channel, Object attachment)
{
set.submit(set.new Connect(channel, attachment));
}
+ /**
+ * @see #accept(SocketChannel, Object)
+ */
+ public void accept(SocketChannel channel)
+ {
+ accept(channel, null);
+ }
+
/**
* <p>Registers a channel to perform non-blocking read/write operations.</p>
* <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
- * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.</p>
+ * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}, or
+ * just after a non-blocking connect via {@link SocketChannel#connect(SocketAddress)} that completed
+ * successfully.</p>
*
* @param channel the channel to register
+ * @param attachment the attachment object
*/
- public void accept(final SocketChannel channel)
+ public void accept(SocketChannel channel, Object attachment)
{
final ManagedSelector selector = chooseSelector();
- selector.submit(selector.new Accept(channel));
+ selector.submit(selector.new Accept(channel, attachment));
}
-
+
/**
* <p>Registers a server channel for accept operations.
* When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel}
* then the {@link #accepted(SocketChannel)} method is called, which must be
* overridden by a derivation of this class to handle the accepted channel
- *
+ *
* @param server the server channel to register
*/
- public void acceptor(final ServerSocketChannel server)
+ public void acceptor(ServerSocketChannel server)
{
final ManagedSelector selector = chooseSelector();
selector.submit(selector.new Acceptor(server));
}
-
+
/**
* Callback method when a channel is accepted from the {@link ServerSocketChannel}
* passed to {@link #acceptor(ServerSocketChannel)}.
if (isRunning())
LOG.warn("Exception while notifying connection " + connection, x);
else
- LOG.debug("Exception while notifying connection {}",connection, x);
+ LOG.debug("Exception while notifying connection " + connection, x);
+ throw x;
}
}
@Override
protected void doStop() throws Exception
{
- LOG.debug("Stopping {}", this);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Stopping {}", this);
Stop stop = new Stop();
submit(stop);
stop.await(getStopTimeout());
- LOG.debug("Stopped {}", this);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Stopped {}", this);
}
/**
* Submit a task to update a selector key. If the System property {@link SelectorManager#SUBMIT_KEY_UPDATES}
- * is set true (default is false), the task is passed to {@link #submit(Runnable)}. Otherwise it is run immediately and the selector
- * woken up if need be.
+ * is set true (default is false), the task is passed to {@link #submit(Runnable)}. Otherwise it is run immediately and the selector
+ * woken up if need be.
* @param update the update to a key
*/
public void updateKey(Runnable update)
}
else
{
- runChange(update);
+ // Run only 1 change at once
+ synchronized (this)
+ {
+ runChange(update);
+ }
if (_state.compareAndSet(State.SELECT, State.WAKEUP))
wakeup();
}
}
-
+
/**
* <p>Submits a change to be executed in the selector thread.</p>
* <p>Changes may be submitted from any thread, and the selector thread woken up
// change to the queue and process the state.
_changes.offer(change);
- LOG.debug("Queued change {}", change);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Queued change {}", change);
out: while (true)
{
{
try
{
- LOG.debug("Running change {}", change);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Running change {}", change);
change.run();
}
catch (Throwable x)
{
_thread = Thread.currentThread();
String name = _thread.getName();
+ int priority = _thread.getPriority();
try
{
- _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id);
- LOG.debug("Starting {} on {}", _thread, this);
+ if (_priorityDelta != 0)
+ _thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _priorityDelta)));
+
+ _thread.setName(String.format("%s-selector-%s@%h/%d", name, SelectorManager.this.getClass().getSimpleName(), SelectorManager.this.hashCode(), _id));
+ if (LOG.isDebugEnabled())
+ LOG.debug("Starting {} on {}", _thread, this);
while (isRunning())
select();
- runChanges();
+ while(isStopping())
+ runChanges();
}
finally
{
- LOG.debug("Stopped {} on {}", _thread, this);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Stopped {} on {}", _thread, this);
_thread.setName(name);
+ if (_priorityDelta != 0)
+ _thread.setPriority(priority);
}
}
_state.set(State.CHANGES);
continue;
default:
- throw new IllegalStateException();
+ throw new IllegalStateException();
}
}
// Must check first for SELECT and *then* for WAKEUP
boolean connected = finishConnect(channel);
if (connected)
{
- connect.timeout.cancel();
- key.interestOps(0);
- EndPoint endpoint = createEndPoint(channel, key);
- key.attach(endpoint);
+ if (connect.timeout.cancel())
+ {
+ key.interestOps(0);
+ EndPoint endpoint = createEndPoint(channel, key);
+ key.attach(endpoint);
+ }
+ else
+ {
+ throw new SocketTimeoutException("Concurrent Connect Timeout");
+ }
}
else
{
connect.failed(x);
}
}
-
+
private void processAccept(SelectionKey key)
{
ServerSocketChannel server = (ServerSocketChannel)key.channel();
Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
endPoint.setConnection(connection);
connectionOpened(connection);
- LOG.debug("Created {}", endPoint);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Created {}", endPoint);
return endPoint;
}
public void destroyEndPoint(EndPoint endPoint)
{
- LOG.debug("Destroyed {}", endPoint);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Destroyed {}", endPoint);
Connection connection = endPoint.getConnection();
if (connection != null)
connectionClosed(connection);
try
{
SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
- LOG.debug("{} acceptor={}", this, key);
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} acceptor={}", this, key);
}
catch (Throwable x)
{
private class Accept implements Runnable
{
- private final SocketChannel _channel;
+ private final SocketChannel channel;
+ private final Object attachment;
- public Accept(SocketChannel channel)
+ private Accept(SocketChannel channel, Object attachment)
{
- this._channel = channel;
+ this.channel = channel;
+ this.attachment = attachment;
}
@Override
{
try
{
- SelectionKey key = _channel.register(_selector, 0, null);
- EndPoint endpoint = createEndPoint(_channel, key);
+ SelectionKey key = channel.register(_selector, 0, attachment);
+ EndPoint endpoint = createEndPoint(channel, key);
key.attach(endpoint);
}
catch (Throwable x)
{
- closeNoExceptions(_channel);
+ closeNoExceptions(channel);
LOG.debug(x);
}
}
private final Object attachment;
private final Scheduler.Task timeout;
- public Connect(SocketChannel channel, Object attachment)
+ private Connect(SocketChannel channel, Object attachment)
{
this.channel = channel;
this.attachment = attachment;
}
}
- protected void failed(Throwable failure)
+ private void failed(Throwable failure)
{
if (failed.compareAndSet(false, true))
{
SocketChannel channel = connect.channel;
if (channel.isConnectionPending())
{
- LOG.debug("Channel {} timed out while connecting, closing it", channel);
- connect.failed(new SocketTimeoutException());
+ if (LOG.isDebugEnabled())
+ LOG.debug("Channel {} timed out while connecting, closing it", channel);
+ connect.failed(new SocketTimeoutException("Connect Timeout"));
}
}
}