+//
+// ========================================================================
+// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.eclipse.jetty.util.ConcurrentArrayQueue;
+import org.eclipse.jetty.util.TypeUtil;
+import org.eclipse.jetty.util.component.AbstractLifeCycle;
+import org.eclipse.jetty.util.component.ContainerLifeCycle;
+import org.eclipse.jetty.util.component.Dumpable;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.NonBlockingThread;
+import org.eclipse.jetty.util.thread.Scheduler;
+
+/**
+ * <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
+ * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p>
+ * <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
+ * {@link EndPoint}s and {@link Connection}s.</p>
+ */
+public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
+{
+ public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates";
+ public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
+ protected static final Logger LOG = Log.getLogger(SelectorManager.class);
+ private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false"));
+
+ private final Executor executor;
+ private final Scheduler scheduler;
+ private final ManagedSelector[] _selectors;
+ private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+ private long _selectorIndex;
+
+ protected SelectorManager(Executor executor, Scheduler scheduler)
+ {
+ this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
+ }
+
+ protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
+ {
+ if (selectors<=0)
+ throw new IllegalArgumentException("No selectors");
+ this.executor = executor;
+ this.scheduler = scheduler;
+ _selectors = new ManagedSelector[selectors];
+ }
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+
+ public Scheduler getScheduler()
+ {
+ return scheduler;
+ }
+
+ /**
+ * Get the connect timeout
+ *
+ * @return the connect timeout (in milliseconds)
+ */
+ public long getConnectTimeout()
+ {
+ return _connectTimeout;
+ }
+
+ /**
+ * Set the connect timeout (in milliseconds)
+ *
+ * @param milliseconds the number of milliseconds for the timeout
+ */
+ public void setConnectTimeout(long milliseconds)
+ {
+ _connectTimeout = milliseconds;
+ }
+
+ /**
+ * Executes the given task in a different thread.
+ *
+ * @param task the task to execute
+ */
+ protected void execute(Runnable task)
+ {
+ executor.execute(task);
+ }
+
+ /**
+ * @return the number of selectors in use
+ */
+ public int getSelectorCount()
+ {
+ return _selectors.length;
+ }
+
+ private ManagedSelector chooseSelector()
+ {
+ // The ++ increment here is not atomic, but it does not matter,
+ // so long as the value changes sometimes, then connections will
+ // be distributed over the available selectors.
+ long s = _selectorIndex++;
+ int index = (int)(s % getSelectorCount());
+ return _selectors[index];
+ }
+
+ /**
+ * <p>Registers a channel to perform a non-blocking connect.</p>
+ * <p>The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)}
+ * must be called prior to calling this method.</p>
+ *
+ * @param channel the channel to register
+ * @param attachment the attachment object
+ */
+ public void connect(SocketChannel channel, Object attachment)
+ {
+ ManagedSelector set = chooseSelector();
+ set.submit(set.new Connect(channel, attachment));
+ }
+
+ /**
+ * <p>Registers a channel to perform non-blocking read/write operations.</p>
+ * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
+ * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.</p>
+ *
+ * @param channel the channel to register
+ */
+ public void accept(final SocketChannel channel)
+ {
+ final ManagedSelector selector = chooseSelector();
+ selector.submit(selector.new Accept(channel));
+ }
+
+ /**
+ * <p>Registers a server channel for accept operations.
+ * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel}
+ * then the {@link #accepted(SocketChannel)} method is called, which must be
+ * overridden by a derivation of this class to handle the accepted channel
+ *
+ * @param server the server channel to register
+ */
+ public void acceptor(final ServerSocketChannel server)
+ {
+ final ManagedSelector selector = chooseSelector();
+ selector.submit(selector.new Acceptor(server));
+ }
+
+ /**
+ * Callback method when a channel is accepted from the {@link ServerSocketChannel}
+ * passed to {@link #acceptor(ServerSocketChannel)}.
+ * The default impl throws an {@link UnsupportedOperationException}, so it must
+ * be overridden by subclasses if a server channel is provided.
+ *
+ * @param channel the
+ * @throws IOException
+ */
+ protected void accepted(SocketChannel channel) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void doStart() throws Exception
+ {
+ super.doStart();
+ for (int i = 0; i < _selectors.length; i++)
+ {
+ ManagedSelector selector = newSelector(i);
+ _selectors[i] = selector;
+ selector.start();
+ execute(new NonBlockingThread(selector));
+ }
+ }
+
+ /**
+ * <p>Factory method for {@link ManagedSelector}.</p>
+ *
+ * @param id an identifier for the {@link ManagedSelector to create}
+ * @return a new {@link ManagedSelector}
+ */
+ protected ManagedSelector newSelector(int id)
+ {
+ return new ManagedSelector(id);
+ }
+
+ @Override
+ protected void doStop() throws Exception
+ {
+ for (ManagedSelector selector : _selectors)
+ selector.stop();
+ super.doStop();
+ }
+
+ /**
+ * <p>Callback method invoked when an endpoint is opened.</p>
+ *
+ * @param endpoint the endpoint being opened
+ */
+ protected void endPointOpened(EndPoint endpoint)
+ {
+ endpoint.onOpen();
+ }
+
+ /**
+ * <p>Callback method invoked when an endpoint is closed.</p>
+ *
+ * @param endpoint the endpoint being closed
+ */
+ protected void endPointClosed(EndPoint endpoint)
+ {
+ endpoint.onClose();
+ }
+
+ /**
+ * <p>Callback method invoked when a connection is opened.</p>
+ *
+ * @param connection the connection just opened
+ */
+ public void connectionOpened(Connection connection)
+ {
+ try
+ {
+ connection.onOpen();
+ }
+ catch (Throwable x)
+ {
+ if (isRunning())
+ LOG.warn("Exception while notifying connection " + connection, x);
+ else
+ LOG.debug("Exception while notifying connection {}",connection, x);
+ }
+ }
+
+ /**
+ * <p>Callback method invoked when a connection is closed.</p>
+ *
+ * @param connection the connection just closed
+ */
+ public void connectionClosed(Connection connection)
+ {
+ try
+ {
+ connection.onClose();
+ }
+ catch (Throwable x)
+ {
+ LOG.debug("Exception while notifying connection " + connection, x);
+ }
+ }
+
+ protected boolean finishConnect(SocketChannel channel) throws IOException
+ {
+ return channel.finishConnect();
+ }
+
+ /**
+ * <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
+ * <p>By default it just logs with level warning.</p>
+ *
+ * @param channel the channel that attempted the connect
+ * @param ex the exception that caused the connect to fail
+ * @param attachment the attachment object associated at registration
+ */
+ protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
+ {
+ LOG.warn(String.format("%s - %s", channel, attachment), ex);
+ }
+
+ /**
+ * <p>Factory method to create {@link EndPoint}.</p>
+ * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
+ * or {@link #accept(SocketChannel)}.</p>
+ *
+ * @param channel the channel associated to the endpoint
+ * @param selector the selector the channel is registered to
+ * @param selectionKey the selection key
+ * @return a new endpoint
+ * @throws IOException if the endPoint cannot be created
+ * @see #newConnection(SocketChannel, EndPoint, Object)
+ */
+ protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
+
+ /**
+ * <p>Factory method to create {@link Connection}.</p>
+ *
+ * @param channel the channel associated to the connection
+ * @param endpoint the endpoint
+ * @param attachment the attachment
+ * @return a new connection
+ * @throws IOException
+ * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
+ */
+ public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
+
+ @Override
+ public String dump()
+ {
+ return ContainerLifeCycle.dump(this);
+ }
+
+ @Override
+ public void dump(Appendable out, String indent) throws IOException
+ {
+ ContainerLifeCycle.dumpObject(out, this);
+ ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
+ }
+
+ private enum State
+ {
+ CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS
+ }
+
+ /**
+ * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
+ * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
+ * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
+ * with the channel.</p>
+ */
+ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
+ {
+ private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESS);
+ private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
+ private final int _id;
+ private Selector _selector;
+ private volatile Thread _thread;
+
+ public ManagedSelector(int id)
+ {
+ _id = id;
+ setStopTimeout(5000);
+ }
+
+ @Override
+ protected void doStart() throws Exception
+ {
+ super.doStart();
+ _selector = Selector.open();
+ _state.set(State.PROCESS);
+ }
+
+ @Override
+ protected void doStop() throws Exception
+ {
+ LOG.debug("Stopping {}", this);
+ Stop stop = new Stop();
+ submit(stop);
+ stop.await(getStopTimeout());
+ LOG.debug("Stopped {}", this);
+ }
+
+ /**
+ * Submit a task to update a selector key. If the System property {@link SelectorManager#SUBMIT_KEY_UPDATES}
+ * is set true (default is false), the task is passed to {@link #submit(Runnable)}. Otherwise it is run immediately and the selector
+ * woken up if need be.
+ * @param update the update to a key
+ */
+ public void updateKey(Runnable update)
+ {
+ if (__submitKeyUpdates)
+ {
+ submit(update);
+ }
+ else
+ {
+ runChange(update);
+ if (_state.compareAndSet(State.SELECT, State.WAKEUP))
+ wakeup();
+ }
+ }
+
+ /**
+ * <p>Submits a change to be executed in the selector thread.</p>
+ * <p>Changes may be submitted from any thread, and the selector thread woken up
+ * (if necessary) to execute the change.</p>
+ *
+ * @param change the change to submit
+ */
+ public void submit(Runnable change)
+ {
+ // This method may be called from the selector thread, and therefore
+ // we could directly run the change without queueing, but this may
+ // lead to stack overflows on a busy server, so we always offer the
+ // change to the queue and process the state.
+
+ _changes.offer(change);
+ LOG.debug("Queued change {}", change);
+
+ out: while (true)
+ {
+ switch (_state.get())
+ {
+ case SELECT:
+ // Avoid multiple wakeup() calls if we the CAS fails
+ if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
+ continue;
+ wakeup();
+ break out;
+ case CHANGES:
+ // Tell the selector thread that we have more changes.
+ // If we fail to CAS, we possibly need to wakeup(), so loop.
+ if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
+ break out;
+ continue;
+ case WAKEUP:
+ // Do nothing, we have already a wakeup scheduled
+ break out;
+ case MORE_CHANGES:
+ // Do nothing, we already notified the selector thread of more changes
+ break out;
+ case PROCESS:
+ // Do nothing, the changes will be run after the processing
+ break out;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private void runChanges()
+ {
+ Runnable change;
+ while ((change = _changes.poll()) != null)
+ runChange(change);
+ }
+
+ protected void runChange(Runnable change)
+ {
+ try
+ {
+ LOG.debug("Running change {}", change);
+ change.run();
+ }
+ catch (Throwable x)
+ {
+ LOG.debug("Could not run change " + change, x);
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ _thread = Thread.currentThread();
+ String name = _thread.getName();
+ try
+ {
+ _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id);
+ LOG.debug("Starting {} on {}", _thread, this);
+ while (isRunning())
+ select();
+ runChanges();
+ }
+ finally
+ {
+ LOG.debug("Stopped {} on {}", _thread, this);
+ _thread.setName(name);
+ }
+ }
+
+ /**
+ * <p>Process changes and waits on {@link Selector#select()}.</p>
+ *
+ * @see #submit(Runnable)
+ */
+ public void select()
+ {
+ boolean debug = LOG.isDebugEnabled();
+ try
+ {
+ _state.set(State.CHANGES);
+
+ // Run the changes, and only exit if we ran all changes
+ out: while(true)
+ {
+ switch (_state.get())
+ {
+ case CHANGES:
+ runChanges();
+ if (_state.compareAndSet(State.CHANGES, State.SELECT))
+ break out;
+ continue;
+ case MORE_CHANGES:
+ runChanges();
+ _state.set(State.CHANGES);
+ continue;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ // Must check first for SELECT and *then* for WAKEUP
+ // because we read the state twice in the assert, and
+ // it could change from SELECT to WAKEUP in between.
+ assert _state.get() == State.SELECT || _state.get() == State.WAKEUP;
+
+ if (debug)
+ LOG.debug("Selector loop waiting on select");
+ int selected = _selector.select();
+ if (debug)
+ LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
+
+ _state.set(State.PROCESS);
+
+ Set<SelectionKey> selectedKeys = _selector.selectedKeys();
+ for (SelectionKey key : selectedKeys)
+ {
+ if (key.isValid())
+ {
+ processKey(key);
+ }
+ else
+ {
+ if (debug)
+ LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
+ Object attachment = key.attachment();
+ if (attachment instanceof EndPoint)
+ ((EndPoint)attachment).close();
+ }
+ }
+ selectedKeys.clear();
+ }
+ catch (Throwable x)
+ {
+ if (isRunning())
+ LOG.warn(x);
+ else
+ LOG.ignore(x);
+ }
+ }
+
+ private void processKey(SelectionKey key)
+ {
+ Object attachment = key.attachment();
+ try
+ {
+ if (attachment instanceof SelectableEndPoint)
+ {
+ ((SelectableEndPoint)attachment).onSelected();
+ }
+ else if (key.isConnectable())
+ {
+ processConnect(key, (Connect)attachment);
+ }
+ else if (key.isAcceptable())
+ {
+ processAccept(key);
+ }
+ else
+ {
+ throw new IllegalStateException();
+ }
+ }
+ catch (CancelledKeyException x)
+ {
+ LOG.debug("Ignoring cancelled key for channel {}", key.channel());
+ if (attachment instanceof EndPoint)
+ closeNoExceptions((EndPoint)attachment);
+ }
+ catch (Throwable x)
+ {
+ LOG.warn("Could not process key for channel " + key.channel(), x);
+ if (attachment instanceof EndPoint)
+ closeNoExceptions((EndPoint)attachment);
+ }
+ }
+
+ private void processConnect(SelectionKey key, Connect connect)
+ {
+ SocketChannel channel = (SocketChannel)key.channel();
+ try
+ {
+ key.attach(connect.attachment);
+ boolean connected = finishConnect(channel);
+ if (connected)
+ {
+ connect.timeout.cancel();
+ key.interestOps(0);
+ EndPoint endpoint = createEndPoint(channel, key);
+ key.attach(endpoint);
+ }
+ else
+ {
+ throw new ConnectException();
+ }
+ }
+ catch (Throwable x)
+ {
+ connect.failed(x);
+ }
+ }
+
+ private void processAccept(SelectionKey key)
+ {
+ ServerSocketChannel server = (ServerSocketChannel)key.channel();
+ SocketChannel channel = null;
+ try
+ {
+ while ((channel = server.accept()) != null)
+ {
+ accepted(channel);
+ }
+ }
+ catch (Throwable x)
+ {
+ closeNoExceptions(channel);
+ LOG.warn("Accept failed for channel " + channel, x);
+ }
+ }
+
+ private void closeNoExceptions(Closeable closeable)
+ {
+ try
+ {
+ if (closeable != null)
+ closeable.close();
+ }
+ catch (Throwable x)
+ {
+ LOG.ignore(x);
+ }
+ }
+
+ public void wakeup()
+ {
+ _selector.wakeup();
+ }
+
+ public boolean isSelectorThread()
+ {
+ return Thread.currentThread() == _thread;
+ }
+
+ private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
+ {
+ EndPoint endPoint = newEndPoint(channel, this, selectionKey);
+ endPointOpened(endPoint);
+ Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
+ endPoint.setConnection(connection);
+ connectionOpened(connection);
+ LOG.debug("Created {}", endPoint);
+ return endPoint;
+ }
+
+ public void destroyEndPoint(EndPoint endPoint)
+ {
+ LOG.debug("Destroyed {}", endPoint);
+ Connection connection = endPoint.getConnection();
+ if (connection != null)
+ connectionClosed(connection);
+ endPointClosed(endPoint);
+ }
+
+ @Override
+ public String dump()
+ {
+ return ContainerLifeCycle.dump(this);
+ }
+
+ @Override
+ public void dump(Appendable out, String indent) throws IOException
+ {
+ out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
+
+ Thread selecting = _thread;
+
+ Object where = "not selecting";
+ StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
+ if (trace != null)
+ {
+ for (StackTraceElement t : trace)
+ if (t.getClassName().startsWith("org.eclipse.jetty."))
+ {
+ where = t;
+ break;
+ }
+ }
+
+ Selector selector = _selector;
+ if (selector != null && selector.isOpen())
+ {
+ final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
+ dump.add(where);
+
+ DumpKeys dumpKeys = new DumpKeys(dump);
+ submit(dumpKeys);
+ dumpKeys.await(5, TimeUnit.SECONDS);
+
+ ContainerLifeCycle.dump(out, indent, dump);
+ }
+ }
+
+ public void dumpKeysState(List<Object> dumps)
+ {
+ Selector selector = _selector;
+ Set<SelectionKey> keys = selector.keys();
+ dumps.add(selector + " keys=" + keys.size());
+ for (SelectionKey key : keys)
+ {
+ if (key.isValid())
+ dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
+ else
+ dumps.add(key.attachment() + " iOps=-1 rOps=-1");
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ Selector selector = _selector;
+ return String.format("%s keys=%d selected=%d",
+ super.toString(),
+ selector != null && selector.isOpen() ? selector.keys().size() : -1,
+ selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
+ }
+
+ private class DumpKeys implements Runnable
+ {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final List<Object> _dumps;
+
+ private DumpKeys(List<Object> dumps)
+ {
+ this._dumps = dumps;
+ }
+
+ @Override
+ public void run()
+ {
+ dumpKeysState(_dumps);
+ latch.countDown();
+ }
+
+ public boolean await(long timeout, TimeUnit unit)
+ {
+ try
+ {
+ return latch.await(timeout, unit);
+ }
+ catch (InterruptedException x)
+ {
+ return false;
+ }
+ }
+ }
+
+ private class Acceptor implements Runnable
+ {
+ private final ServerSocketChannel _channel;
+
+ public Acceptor(ServerSocketChannel channel)
+ {
+ this._channel = channel;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
+ LOG.debug("{} acceptor={}", this, key);
+ }
+ catch (Throwable x)
+ {
+ closeNoExceptions(_channel);
+ LOG.warn(x);
+ }
+ }
+ }
+
+ private class Accept implements Runnable
+ {
+ private final SocketChannel _channel;
+
+ public Accept(SocketChannel channel)
+ {
+ this._channel = channel;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ SelectionKey key = _channel.register(_selector, 0, null);
+ EndPoint endpoint = createEndPoint(_channel, key);
+ key.attach(endpoint);
+ }
+ catch (Throwable x)
+ {
+ closeNoExceptions(_channel);
+ LOG.debug(x);
+ }
+ }
+ }
+
+ private class Connect implements Runnable
+ {
+ private final AtomicBoolean failed = new AtomicBoolean();
+ private final SocketChannel channel;
+ private final Object attachment;
+ private final Scheduler.Task timeout;
+
+ public Connect(SocketChannel channel, Object attachment)
+ {
+ this.channel = channel;
+ this.attachment = attachment;
+ this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ channel.register(_selector, SelectionKey.OP_CONNECT, this);
+ }
+ catch (Throwable x)
+ {
+ failed(x);
+ }
+ }
+
+ protected void failed(Throwable failure)
+ {
+ if (failed.compareAndSet(false, true))
+ {
+ timeout.cancel();
+ closeNoExceptions(channel);
+ connectionFailed(channel, failure, attachment);
+ }
+ }
+ }
+
+ private class ConnectTimeout implements Runnable
+ {
+ private final Connect connect;
+
+ private ConnectTimeout(Connect connect)
+ {
+ this.connect = connect;
+ }
+
+ @Override
+ public void run()
+ {
+ SocketChannel channel = connect.channel;
+ if (channel.isConnectionPending())
+ {
+ LOG.debug("Channel {} timed out while connecting, closing it", channel);
+ connect.failed(new SocketTimeoutException());
+ }
+ }
+ }
+
+ private class Stop implements Runnable
+ {
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ for (SelectionKey key : _selector.keys())
+ {
+ Object attachment = key.attachment();
+ if (attachment instanceof EndPoint)
+ {
+ EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
+ execute(closer);
+ // We are closing the SelectorManager, so we want to block the
+ // selector thread here until we have closed all EndPoints.
+ // This is different than calling close() directly, because close()
+ // can wait forever, while here we are limited by the stop timeout.
+ closer.await(getStopTimeout());
+ }
+ }
+
+ closeNoExceptions(_selector);
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ }
+
+ public boolean await(long timeout)
+ {
+ try
+ {
+ return latch.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException x)
+ {
+ return false;
+ }
+ }
+ }
+
+ private class EndPointCloser implements Runnable
+ {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final EndPoint endPoint;
+
+ private EndPointCloser(EndPoint endPoint)
+ {
+ this.endPoint = endPoint;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ closeNoExceptions(endPoint.getConnection());
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ }
+
+ private boolean await(long timeout)
+ {
+ try
+ {
+ return latch.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException x)
+ {
+ return false;
+ }
+ }
+ }
+ }
+
+ /**
+ * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of
+ * non-blocking events by the {@link ManagedSelector}.
+ */
+ public interface SelectableEndPoint extends EndPoint
+ {
+ /**
+ * <p>Callback method invoked when a read or write events has been detected by the {@link ManagedSelector}
+ * for this endpoint.</p>
+ */
+ void onSelected();
+ }
+}