2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.io;
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.Socket;
25 import java.net.SocketAddress;
26 import java.net.SocketTimeoutException;
27 import java.nio.channels.CancelledKeyException;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.nio.channels.ServerSocketChannel;
31 import java.nio.channels.SocketChannel;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Queue;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
42 import org.eclipse.jetty.util.ConcurrentArrayQueue;
43 import org.eclipse.jetty.util.TypeUtil;
44 import org.eclipse.jetty.util.component.AbstractLifeCycle;
45 import org.eclipse.jetty.util.component.ContainerLifeCycle;
46 import org.eclipse.jetty.util.component.Dumpable;
47 import org.eclipse.jetty.util.log.Log;
48 import org.eclipse.jetty.util.log.Logger;
49 import org.eclipse.jetty.util.thread.NonBlockingThread;
50 import org.eclipse.jetty.util.thread.Scheduler;
53 * <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
54 * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p>
55 * <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
56 * {@link EndPoint}s and {@link Connection}s.</p>
58 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
60 public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates";
61 public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
62 protected static final Logger LOG = Log.getLogger(SelectorManager.class);
63 private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false"));
65 private final Executor executor;
66 private final Scheduler scheduler;
67 private final ManagedSelector[] _selectors;
68 private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
69 private long _selectorIndex;
71 protected SelectorManager(Executor executor, Scheduler scheduler)
73 this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
76 protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
79 throw new IllegalArgumentException("No selectors");
80 this.executor = executor;
81 this.scheduler = scheduler;
82 _selectors = new ManagedSelector[selectors];
85 public Executor getExecutor()
90 public Scheduler getScheduler()
96 * Get the connect timeout
98 * @return the connect timeout (in milliseconds)
100 public long getConnectTimeout()
102 return _connectTimeout;
106 * Set the connect timeout (in milliseconds)
108 * @param milliseconds the number of milliseconds for the timeout
110 public void setConnectTimeout(long milliseconds)
112 _connectTimeout = milliseconds;
116 * Executes the given task in a different thread.
118 * @param task the task to execute
120 protected void execute(Runnable task)
122 executor.execute(task);
126 * @return the number of selectors in use
128 public int getSelectorCount()
130 return _selectors.length;
133 private ManagedSelector chooseSelector()
135 // The ++ increment here is not atomic, but it does not matter,
136 // so long as the value changes sometimes, then connections will
137 // be distributed over the available selectors.
138 long s = _selectorIndex++;
139 int index = (int)(s % getSelectorCount());
140 return _selectors[index];
144 * <p>Registers a channel to perform a non-blocking connect.</p>
145 * <p>The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)}
146 * must be called prior to calling this method.</p>
148 * @param channel the channel to register
149 * @param attachment the attachment object
151 public void connect(SocketChannel channel, Object attachment)
153 ManagedSelector set = chooseSelector();
154 set.submit(set.new Connect(channel, attachment));
158 * <p>Registers a channel to perform non-blocking read/write operations.</p>
159 * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
160 * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.</p>
162 * @param channel the channel to register
164 public void accept(final SocketChannel channel)
166 final ManagedSelector selector = chooseSelector();
167 selector.submit(selector.new Accept(channel));
171 * <p>Registers a server channel for accept operations.
172 * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel}
173 * then the {@link #accepted(SocketChannel)} method is called, which must be
174 * overridden by a derivation of this class to handle the accepted channel
176 * @param server the server channel to register
178 public void acceptor(final ServerSocketChannel server)
180 final ManagedSelector selector = chooseSelector();
181 selector.submit(selector.new Acceptor(server));
185 * Callback method when a channel is accepted from the {@link ServerSocketChannel}
186 * passed to {@link #acceptor(ServerSocketChannel)}.
187 * The default impl throws an {@link UnsupportedOperationException}, so it must
188 * be overridden by subclasses if a server channel is provided.
191 * @throws IOException
193 protected void accepted(SocketChannel channel) throws IOException
195 throw new UnsupportedOperationException();
199 protected void doStart() throws Exception
202 for (int i = 0; i < _selectors.length; i++)
204 ManagedSelector selector = newSelector(i);
205 _selectors[i] = selector;
207 execute(new NonBlockingThread(selector));
212 * <p>Factory method for {@link ManagedSelector}.</p>
214 * @param id an identifier for the {@link ManagedSelector to create}
215 * @return a new {@link ManagedSelector}
217 protected ManagedSelector newSelector(int id)
219 return new ManagedSelector(id);
223 protected void doStop() throws Exception
225 for (ManagedSelector selector : _selectors)
231 * <p>Callback method invoked when an endpoint is opened.</p>
233 * @param endpoint the endpoint being opened
235 protected void endPointOpened(EndPoint endpoint)
241 * <p>Callback method invoked when an endpoint is closed.</p>
243 * @param endpoint the endpoint being closed
245 protected void endPointClosed(EndPoint endpoint)
251 * <p>Callback method invoked when a connection is opened.</p>
253 * @param connection the connection just opened
255 public void connectionOpened(Connection connection)
264 LOG.warn("Exception while notifying connection " + connection, x);
266 LOG.debug("Exception while notifying connection {}",connection, x);
271 * <p>Callback method invoked when a connection is closed.</p>
273 * @param connection the connection just closed
275 public void connectionClosed(Connection connection)
279 connection.onClose();
283 LOG.debug("Exception while notifying connection " + connection, x);
287 protected boolean finishConnect(SocketChannel channel) throws IOException
289 return channel.finishConnect();
293 * <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
294 * <p>By default it just logs with level warning.</p>
296 * @param channel the channel that attempted the connect
297 * @param ex the exception that caused the connect to fail
298 * @param attachment the attachment object associated at registration
300 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
302 LOG.warn(String.format("%s - %s", channel, attachment), ex);
306 * <p>Factory method to create {@link EndPoint}.</p>
307 * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
308 * or {@link #accept(SocketChannel)}.</p>
310 * @param channel the channel associated to the endpoint
311 * @param selector the selector the channel is registered to
312 * @param selectionKey the selection key
313 * @return a new endpoint
314 * @throws IOException if the endPoint cannot be created
315 * @see #newConnection(SocketChannel, EndPoint, Object)
317 protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
320 * <p>Factory method to create {@link Connection}.</p>
322 * @param channel the channel associated to the connection
323 * @param endpoint the endpoint
324 * @param attachment the attachment
325 * @return a new connection
326 * @throws IOException
327 * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
329 public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
334 return ContainerLifeCycle.dump(this);
338 public void dump(Appendable out, String indent) throws IOException
340 ContainerLifeCycle.dumpObject(out, this);
341 ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
346 CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS
350 * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
351 * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
352 * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
353 * with the channel.</p>
355 public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
357 private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESS);
358 private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
359 private final int _id;
360 private Selector _selector;
361 private volatile Thread _thread;
363 public ManagedSelector(int id)
366 setStopTimeout(5000);
370 protected void doStart() throws Exception
373 _selector = Selector.open();
374 _state.set(State.PROCESS);
378 protected void doStop() throws Exception
380 LOG.debug("Stopping {}", this);
381 Stop stop = new Stop();
383 stop.await(getStopTimeout());
384 LOG.debug("Stopped {}", this);
388 * Submit a task to update a selector key. If the System property {@link SelectorManager#SUBMIT_KEY_UPDATES}
389 * is set true (default is false), the task is passed to {@link #submit(Runnable)}. Otherwise it is run immediately and the selector
390 * woken up if need be.
391 * @param update the update to a key
393 public void updateKey(Runnable update)
395 if (__submitKeyUpdates)
402 if (_state.compareAndSet(State.SELECT, State.WAKEUP))
408 * <p>Submits a change to be executed in the selector thread.</p>
409 * <p>Changes may be submitted from any thread, and the selector thread woken up
410 * (if necessary) to execute the change.</p>
412 * @param change the change to submit
414 public void submit(Runnable change)
416 // This method may be called from the selector thread, and therefore
417 // we could directly run the change without queueing, but this may
418 // lead to stack overflows on a busy server, so we always offer the
419 // change to the queue and process the state.
421 _changes.offer(change);
422 LOG.debug("Queued change {}", change);
426 switch (_state.get())
429 // Avoid multiple wakeup() calls if we the CAS fails
430 if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
435 // Tell the selector thread that we have more changes.
436 // If we fail to CAS, we possibly need to wakeup(), so loop.
437 if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
441 // Do nothing, we have already a wakeup scheduled
444 // Do nothing, we already notified the selector thread of more changes
447 // Do nothing, the changes will be run after the processing
450 throw new IllegalStateException();
455 private void runChanges()
458 while ((change = _changes.poll()) != null)
462 protected void runChange(Runnable change)
466 LOG.debug("Running change {}", change);
471 LOG.debug("Could not run change " + change, x);
478 _thread = Thread.currentThread();
479 String name = _thread.getName();
482 _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id);
483 LOG.debug("Starting {} on {}", _thread, this);
490 LOG.debug("Stopped {} on {}", _thread, this);
491 _thread.setName(name);
496 * <p>Process changes and waits on {@link Selector#select()}.</p>
498 * @see #submit(Runnable)
502 boolean debug = LOG.isDebugEnabled();
505 _state.set(State.CHANGES);
507 // Run the changes, and only exit if we ran all changes
510 switch (_state.get())
514 if (_state.compareAndSet(State.CHANGES, State.SELECT))
519 _state.set(State.CHANGES);
522 throw new IllegalStateException();
525 // Must check first for SELECT and *then* for WAKEUP
526 // because we read the state twice in the assert, and
527 // it could change from SELECT to WAKEUP in between.
528 assert _state.get() == State.SELECT || _state.get() == State.WAKEUP;
531 LOG.debug("Selector loop waiting on select");
532 int selected = _selector.select();
534 LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
536 _state.set(State.PROCESS);
538 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
539 for (SelectionKey key : selectedKeys)
548 LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
549 Object attachment = key.attachment();
550 if (attachment instanceof EndPoint)
551 ((EndPoint)attachment).close();
554 selectedKeys.clear();
565 private void processKey(SelectionKey key)
567 Object attachment = key.attachment();
570 if (attachment instanceof SelectableEndPoint)
572 ((SelectableEndPoint)attachment).onSelected();
574 else if (key.isConnectable())
576 processConnect(key, (Connect)attachment);
578 else if (key.isAcceptable())
584 throw new IllegalStateException();
587 catch (CancelledKeyException x)
589 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
590 if (attachment instanceof EndPoint)
591 closeNoExceptions((EndPoint)attachment);
595 LOG.warn("Could not process key for channel " + key.channel(), x);
596 if (attachment instanceof EndPoint)
597 closeNoExceptions((EndPoint)attachment);
601 private void processConnect(SelectionKey key, Connect connect)
603 SocketChannel channel = (SocketChannel)key.channel();
606 key.attach(connect.attachment);
607 boolean connected = finishConnect(channel);
610 connect.timeout.cancel();
612 EndPoint endpoint = createEndPoint(channel, key);
613 key.attach(endpoint);
617 throw new ConnectException();
626 private void processAccept(SelectionKey key)
628 ServerSocketChannel server = (ServerSocketChannel)key.channel();
629 SocketChannel channel = null;
632 while ((channel = server.accept()) != null)
639 closeNoExceptions(channel);
640 LOG.warn("Accept failed for channel " + channel, x);
644 private void closeNoExceptions(Closeable closeable)
648 if (closeable != null)
662 public boolean isSelectorThread()
664 return Thread.currentThread() == _thread;
667 private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
669 EndPoint endPoint = newEndPoint(channel, this, selectionKey);
670 endPointOpened(endPoint);
671 Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
672 endPoint.setConnection(connection);
673 connectionOpened(connection);
674 LOG.debug("Created {}", endPoint);
678 public void destroyEndPoint(EndPoint endPoint)
680 LOG.debug("Destroyed {}", endPoint);
681 Connection connection = endPoint.getConnection();
682 if (connection != null)
683 connectionClosed(connection);
684 endPointClosed(endPoint);
690 return ContainerLifeCycle.dump(this);
694 public void dump(Appendable out, String indent) throws IOException
696 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
698 Thread selecting = _thread;
700 Object where = "not selecting";
701 StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
704 for (StackTraceElement t : trace)
705 if (t.getClassName().startsWith("org.eclipse.jetty."))
712 Selector selector = _selector;
713 if (selector != null && selector.isOpen())
715 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
718 DumpKeys dumpKeys = new DumpKeys(dump);
720 dumpKeys.await(5, TimeUnit.SECONDS);
722 ContainerLifeCycle.dump(out, indent, dump);
726 public void dumpKeysState(List<Object> dumps)
728 Selector selector = _selector;
729 Set<SelectionKey> keys = selector.keys();
730 dumps.add(selector + " keys=" + keys.size());
731 for (SelectionKey key : keys)
734 dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
736 dumps.add(key.attachment() + " iOps=-1 rOps=-1");
741 public String toString()
743 Selector selector = _selector;
744 return String.format("%s keys=%d selected=%d",
746 selector != null && selector.isOpen() ? selector.keys().size() : -1,
747 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
750 private class DumpKeys implements Runnable
752 private final CountDownLatch latch = new CountDownLatch(1);
753 private final List<Object> _dumps;
755 private DumpKeys(List<Object> dumps)
763 dumpKeysState(_dumps);
767 public boolean await(long timeout, TimeUnit unit)
771 return latch.await(timeout, unit);
773 catch (InterruptedException x)
780 private class Acceptor implements Runnable
782 private final ServerSocketChannel _channel;
784 public Acceptor(ServerSocketChannel channel)
786 this._channel = channel;
794 SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
795 LOG.debug("{} acceptor={}", this, key);
799 closeNoExceptions(_channel);
805 private class Accept implements Runnable
807 private final SocketChannel _channel;
809 public Accept(SocketChannel channel)
811 this._channel = channel;
819 SelectionKey key = _channel.register(_selector, 0, null);
820 EndPoint endpoint = createEndPoint(_channel, key);
821 key.attach(endpoint);
825 closeNoExceptions(_channel);
831 private class Connect implements Runnable
833 private final AtomicBoolean failed = new AtomicBoolean();
834 private final SocketChannel channel;
835 private final Object attachment;
836 private final Scheduler.Task timeout;
838 public Connect(SocketChannel channel, Object attachment)
840 this.channel = channel;
841 this.attachment = attachment;
842 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
850 channel.register(_selector, SelectionKey.OP_CONNECT, this);
858 protected void failed(Throwable failure)
860 if (failed.compareAndSet(false, true))
863 closeNoExceptions(channel);
864 connectionFailed(channel, failure, attachment);
869 private class ConnectTimeout implements Runnable
871 private final Connect connect;
873 private ConnectTimeout(Connect connect)
875 this.connect = connect;
881 SocketChannel channel = connect.channel;
882 if (channel.isConnectionPending())
884 LOG.debug("Channel {} timed out while connecting, closing it", channel);
885 connect.failed(new SocketTimeoutException());
890 private class Stop implements Runnable
892 private final CountDownLatch latch = new CountDownLatch(1);
899 for (SelectionKey key : _selector.keys())
901 Object attachment = key.attachment();
902 if (attachment instanceof EndPoint)
904 EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
906 // We are closing the SelectorManager, so we want to block the
907 // selector thread here until we have closed all EndPoints.
908 // This is different than calling close() directly, because close()
909 // can wait forever, while here we are limited by the stop timeout.
910 closer.await(getStopTimeout());
914 closeNoExceptions(_selector);
922 public boolean await(long timeout)
926 return latch.await(timeout, TimeUnit.MILLISECONDS);
928 catch (InterruptedException x)
935 private class EndPointCloser implements Runnable
937 private final CountDownLatch latch = new CountDownLatch(1);
938 private final EndPoint endPoint;
940 private EndPointCloser(EndPoint endPoint)
942 this.endPoint = endPoint;
950 closeNoExceptions(endPoint.getConnection());
958 private boolean await(long timeout)
962 return latch.await(timeout, TimeUnit.MILLISECONDS);
964 catch (InterruptedException x)
973 * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of
974 * non-blocking events by the {@link ManagedSelector}.
976 public interface SelectableEndPoint extends EndPoint
979 * <p>Callback method invoked when a read or write events has been detected by the {@link ManagedSelector}
980 * for this endpoint.</p>