2 // ========================================================================
3 // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.io;
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.Socket;
25 import java.net.SocketAddress;
26 import java.net.SocketTimeoutException;
27 import java.nio.channels.CancelledKeyException;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.nio.channels.ServerSocketChannel;
31 import java.nio.channels.SocketChannel;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Queue;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
42 import org.eclipse.jetty.util.ConcurrentArrayQueue;
43 import org.eclipse.jetty.util.TypeUtil;
44 import org.eclipse.jetty.util.annotation.ManagedAttribute;
45 import org.eclipse.jetty.util.component.AbstractLifeCycle;
46 import org.eclipse.jetty.util.component.ContainerLifeCycle;
47 import org.eclipse.jetty.util.component.Dumpable;
48 import org.eclipse.jetty.util.log.Log;
49 import org.eclipse.jetty.util.log.Logger;
50 import org.eclipse.jetty.util.thread.NonBlockingThread;
51 import org.eclipse.jetty.util.thread.Scheduler;
54 * <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
55 * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p>
56 * <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
57 * {@link EndPoint}s and {@link Connection}s.</p>
59 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
61 public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates";
62 public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
63 protected static final Logger LOG = Log.getLogger(SelectorManager.class);
64 private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "true"));
66 private final Executor executor;
67 private final Scheduler scheduler;
68 private final ManagedSelector[] _selectors;
69 private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
70 private long _selectorIndex;
71 private int _priorityDelta;
73 protected SelectorManager(Executor executor, Scheduler scheduler)
75 this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
78 protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
81 throw new IllegalArgumentException("No selectors");
82 this.executor = executor;
83 this.scheduler = scheduler;
84 _selectors = new ManagedSelector[selectors];
87 public Executor getExecutor()
92 public Scheduler getScheduler()
98 * Get the connect timeout
100 * @return the connect timeout (in milliseconds)
102 public long getConnectTimeout()
104 return _connectTimeout;
108 * Set the connect timeout (in milliseconds)
110 * @param milliseconds the number of milliseconds for the timeout
112 public void setConnectTimeout(long milliseconds)
114 _connectTimeout = milliseconds;
118 @ManagedAttribute("The priority delta to apply to selector threads")
119 public int getSelectorPriorityDelta()
121 return _priorityDelta;
125 * Sets the selector thread priority delta to the given amount.
126 * <p>This allows the selector threads to run at a different priority.
127 * Typically this would be used to lower the priority to give preference
128 * to handling previously accepted connections rather than accepting
129 * new connections.</p>
131 * @param selectorPriorityDelta the amount to change the thread priority
132 * delta to (may be negative)
133 * @see Thread#getPriority()
135 public void setSelectorPriorityDelta(int selectorPriorityDelta)
137 int oldDelta = _priorityDelta;
138 _priorityDelta = selectorPriorityDelta;
139 if (oldDelta != selectorPriorityDelta && isStarted())
141 for (ManagedSelector selector : _selectors)
143 Thread thread = selector._thread;
146 int deltaDiff = selectorPriorityDelta - oldDelta;
147 thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, thread.getPriority() - deltaDiff)));
154 * Executes the given task in a different thread.
156 * @param task the task to execute
158 protected void execute(Runnable task)
160 executor.execute(task);
164 * @return the number of selectors in use
166 public int getSelectorCount()
168 return _selectors.length;
171 private ManagedSelector chooseSelector()
173 // The ++ increment here is not atomic, but it does not matter,
174 // so long as the value changes sometimes, then connections will
175 // be distributed over the available selectors.
176 long s = _selectorIndex++;
177 int index = (int)(s % getSelectorCount());
178 return _selectors[index];
182 * <p>Registers a channel to perform a non-blocking connect.</p>
183 * <p>The channel must be set in non-blocking mode, {@link SocketChannel#connect(SocketAddress)}
184 * must be called prior to calling this method, and the connect operation must not be completed
185 * (the return value of {@link SocketChannel#connect(SocketAddress)} must be false).</p>
187 * @param channel the channel to register
188 * @param attachment the attachment object
189 * @see #accept(SocketChannel, Object)
191 public void connect(SocketChannel channel, Object attachment)
193 ManagedSelector set = chooseSelector();
194 set.submit(set.new Connect(channel, attachment));
198 * @see #accept(SocketChannel, Object)
200 public void accept(SocketChannel channel)
202 accept(channel, null);
206 * <p>Registers a channel to perform non-blocking read/write operations.</p>
207 * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
208 * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}, or
209 * just after a non-blocking connect via {@link SocketChannel#connect(SocketAddress)} that completed
212 * @param channel the channel to register
213 * @param attachment the attachment object
215 public void accept(SocketChannel channel, Object attachment)
217 final ManagedSelector selector = chooseSelector();
218 selector.submit(selector.new Accept(channel, attachment));
222 * <p>Registers a server channel for accept operations.
223 * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel}
224 * then the {@link #accepted(SocketChannel)} method is called, which must be
225 * overridden by a derivation of this class to handle the accepted channel
227 * @param server the server channel to register
229 public void acceptor(ServerSocketChannel server)
231 final ManagedSelector selector = chooseSelector();
232 selector.submit(selector.new Acceptor(server));
236 * Callback method when a channel is accepted from the {@link ServerSocketChannel}
237 * passed to {@link #acceptor(ServerSocketChannel)}.
238 * The default impl throws an {@link UnsupportedOperationException}, so it must
239 * be overridden by subclasses if a server channel is provided.
242 * @throws IOException
244 protected void accepted(SocketChannel channel) throws IOException
246 throw new UnsupportedOperationException();
250 protected void doStart() throws Exception
253 for (int i = 0; i < _selectors.length; i++)
255 ManagedSelector selector = newSelector(i);
256 _selectors[i] = selector;
258 execute(new NonBlockingThread(selector));
263 * <p>Factory method for {@link ManagedSelector}.</p>
265 * @param id an identifier for the {@link ManagedSelector to create}
266 * @return a new {@link ManagedSelector}
268 protected ManagedSelector newSelector(int id)
270 return new ManagedSelector(id);
274 protected void doStop() throws Exception
276 for (ManagedSelector selector : _selectors)
282 * <p>Callback method invoked when an endpoint is opened.</p>
284 * @param endpoint the endpoint being opened
286 protected void endPointOpened(EndPoint endpoint)
292 * <p>Callback method invoked when an endpoint is closed.</p>
294 * @param endpoint the endpoint being closed
296 protected void endPointClosed(EndPoint endpoint)
302 * <p>Callback method invoked when a connection is opened.</p>
304 * @param connection the connection just opened
306 public void connectionOpened(Connection connection)
315 LOG.warn("Exception while notifying connection " + connection, x);
317 LOG.debug("Exception while notifying connection " + connection, x);
323 * <p>Callback method invoked when a connection is closed.</p>
325 * @param connection the connection just closed
327 public void connectionClosed(Connection connection)
331 connection.onClose();
335 LOG.debug("Exception while notifying connection " + connection, x);
339 protected boolean finishConnect(SocketChannel channel) throws IOException
341 return channel.finishConnect();
345 * <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
346 * <p>By default it just logs with level warning.</p>
348 * @param channel the channel that attempted the connect
349 * @param ex the exception that caused the connect to fail
350 * @param attachment the attachment object associated at registration
352 protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
354 LOG.warn(String.format("%s - %s", channel, attachment), ex);
358 * <p>Factory method to create {@link EndPoint}.</p>
359 * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
360 * or {@link #accept(SocketChannel)}.</p>
362 * @param channel the channel associated to the endpoint
363 * @param selector the selector the channel is registered to
364 * @param selectionKey the selection key
365 * @return a new endpoint
366 * @throws IOException if the endPoint cannot be created
367 * @see #newConnection(SocketChannel, EndPoint, Object)
369 protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
372 * <p>Factory method to create {@link Connection}.</p>
374 * @param channel the channel associated to the connection
375 * @param endpoint the endpoint
376 * @param attachment the attachment
377 * @return a new connection
378 * @throws IOException
379 * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
381 public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
386 return ContainerLifeCycle.dump(this);
390 public void dump(Appendable out, String indent) throws IOException
392 ContainerLifeCycle.dumpObject(out, this);
393 ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
398 CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS
402 * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
403 * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
404 * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
405 * with the channel.</p>
407 public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
409 private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESS);
410 private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
411 private final int _id;
412 private Selector _selector;
413 private volatile Thread _thread;
415 public ManagedSelector(int id)
418 setStopTimeout(5000);
422 protected void doStart() throws Exception
425 _selector = Selector.open();
426 _state.set(State.PROCESS);
430 protected void doStop() throws Exception
432 if (LOG.isDebugEnabled())
433 LOG.debug("Stopping {}", this);
434 Stop stop = new Stop();
436 stop.await(getStopTimeout());
437 if (LOG.isDebugEnabled())
438 LOG.debug("Stopped {}", this);
442 * Submit a task to update a selector key. If the System property {@link SelectorManager#SUBMIT_KEY_UPDATES}
443 * is set true (default is false), the task is passed to {@link #submit(Runnable)}. Otherwise it is run immediately and the selector
444 * woken up if need be.
445 * @param update the update to a key
447 public void updateKey(Runnable update)
449 if (__submitKeyUpdates)
455 // Run only 1 change at once
460 if (_state.compareAndSet(State.SELECT, State.WAKEUP))
466 * <p>Submits a change to be executed in the selector thread.</p>
467 * <p>Changes may be submitted from any thread, and the selector thread woken up
468 * (if necessary) to execute the change.</p>
470 * @param change the change to submit
472 public void submit(Runnable change)
474 // This method may be called from the selector thread, and therefore
475 // we could directly run the change without queueing, but this may
476 // lead to stack overflows on a busy server, so we always offer the
477 // change to the queue and process the state.
479 _changes.offer(change);
480 if (LOG.isDebugEnabled())
481 LOG.debug("Queued change {}", change);
485 switch (_state.get())
488 // Avoid multiple wakeup() calls if we the CAS fails
489 if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
494 // Tell the selector thread that we have more changes.
495 // If we fail to CAS, we possibly need to wakeup(), so loop.
496 if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
500 // Do nothing, we have already a wakeup scheduled
503 // Do nothing, we already notified the selector thread of more changes
506 // Do nothing, the changes will be run after the processing
509 throw new IllegalStateException();
514 private void runChanges()
517 while ((change = _changes.poll()) != null)
521 protected void runChange(Runnable change)
525 if (LOG.isDebugEnabled())
526 LOG.debug("Running change {}", change);
531 LOG.debug("Could not run change " + change, x);
538 _thread = Thread.currentThread();
539 String name = _thread.getName();
540 int priority = _thread.getPriority();
543 if (_priorityDelta != 0)
544 _thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _priorityDelta)));
546 _thread.setName(String.format("%s-selector-%s@%h/%d", name, SelectorManager.this.getClass().getSimpleName(), SelectorManager.this.hashCode(), _id));
547 if (LOG.isDebugEnabled())
548 LOG.debug("Starting {} on {}", _thread, this);
556 if (LOG.isDebugEnabled())
557 LOG.debug("Stopped {} on {}", _thread, this);
558 _thread.setName(name);
559 if (_priorityDelta != 0)
560 _thread.setPriority(priority);
565 * <p>Process changes and waits on {@link Selector#select()}.</p>
567 * @see #submit(Runnable)
571 boolean debug = LOG.isDebugEnabled();
574 _state.set(State.CHANGES);
576 // Run the changes, and only exit if we ran all changes
579 switch (_state.get())
583 if (_state.compareAndSet(State.CHANGES, State.SELECT))
588 _state.set(State.CHANGES);
591 throw new IllegalStateException();
594 // Must check first for SELECT and *then* for WAKEUP
595 // because we read the state twice in the assert, and
596 // it could change from SELECT to WAKEUP in between.
597 assert _state.get() == State.SELECT || _state.get() == State.WAKEUP;
600 LOG.debug("Selector loop waiting on select");
601 int selected = _selector.select();
603 LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
605 _state.set(State.PROCESS);
607 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
608 for (SelectionKey key : selectedKeys)
617 LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
618 Object attachment = key.attachment();
619 if (attachment instanceof EndPoint)
620 ((EndPoint)attachment).close();
623 selectedKeys.clear();
634 private void processKey(SelectionKey key)
636 Object attachment = key.attachment();
639 if (attachment instanceof SelectableEndPoint)
641 ((SelectableEndPoint)attachment).onSelected();
643 else if (key.isConnectable())
645 processConnect(key, (Connect)attachment);
647 else if (key.isAcceptable())
653 throw new IllegalStateException();
656 catch (CancelledKeyException x)
658 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
659 if (attachment instanceof EndPoint)
660 closeNoExceptions((EndPoint)attachment);
664 LOG.warn("Could not process key for channel " + key.channel(), x);
665 if (attachment instanceof EndPoint)
666 closeNoExceptions((EndPoint)attachment);
670 private void processConnect(SelectionKey key, Connect connect)
672 SocketChannel channel = (SocketChannel)key.channel();
675 key.attach(connect.attachment);
676 boolean connected = finishConnect(channel);
679 if (connect.timeout.cancel())
682 EndPoint endpoint = createEndPoint(channel, key);
683 key.attach(endpoint);
687 throw new SocketTimeoutException("Concurrent Connect Timeout");
692 throw new ConnectException();
701 private void processAccept(SelectionKey key)
703 ServerSocketChannel server = (ServerSocketChannel)key.channel();
704 SocketChannel channel = null;
707 while ((channel = server.accept()) != null)
714 closeNoExceptions(channel);
715 LOG.warn("Accept failed for channel " + channel, x);
719 private void closeNoExceptions(Closeable closeable)
723 if (closeable != null)
737 public boolean isSelectorThread()
739 return Thread.currentThread() == _thread;
742 private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
744 EndPoint endPoint = newEndPoint(channel, this, selectionKey);
745 endPointOpened(endPoint);
746 Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
747 endPoint.setConnection(connection);
748 connectionOpened(connection);
749 if (LOG.isDebugEnabled())
750 LOG.debug("Created {}", endPoint);
754 public void destroyEndPoint(EndPoint endPoint)
756 if (LOG.isDebugEnabled())
757 LOG.debug("Destroyed {}", endPoint);
758 Connection connection = endPoint.getConnection();
759 if (connection != null)
760 connectionClosed(connection);
761 endPointClosed(endPoint);
767 return ContainerLifeCycle.dump(this);
771 public void dump(Appendable out, String indent) throws IOException
773 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
775 Thread selecting = _thread;
777 Object where = "not selecting";
778 StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
781 for (StackTraceElement t : trace)
782 if (t.getClassName().startsWith("org.eclipse.jetty."))
789 Selector selector = _selector;
790 if (selector != null && selector.isOpen())
792 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
795 DumpKeys dumpKeys = new DumpKeys(dump);
797 dumpKeys.await(5, TimeUnit.SECONDS);
799 ContainerLifeCycle.dump(out, indent, dump);
803 public void dumpKeysState(List<Object> dumps)
805 Selector selector = _selector;
806 Set<SelectionKey> keys = selector.keys();
807 dumps.add(selector + " keys=" + keys.size());
808 for (SelectionKey key : keys)
811 dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
813 dumps.add(key.attachment() + " iOps=-1 rOps=-1");
818 public String toString()
820 Selector selector = _selector;
821 return String.format("%s keys=%d selected=%d",
823 selector != null && selector.isOpen() ? selector.keys().size() : -1,
824 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
827 private class DumpKeys implements Runnable
829 private final CountDownLatch latch = new CountDownLatch(1);
830 private final List<Object> _dumps;
832 private DumpKeys(List<Object> dumps)
840 dumpKeysState(_dumps);
844 public boolean await(long timeout, TimeUnit unit)
848 return latch.await(timeout, unit);
850 catch (InterruptedException x)
857 private class Acceptor implements Runnable
859 private final ServerSocketChannel _channel;
861 public Acceptor(ServerSocketChannel channel)
863 this._channel = channel;
871 SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
872 if (LOG.isDebugEnabled())
873 LOG.debug("{} acceptor={}", this, key);
877 closeNoExceptions(_channel);
883 private class Accept implements Runnable
885 private final SocketChannel channel;
886 private final Object attachment;
888 private Accept(SocketChannel channel, Object attachment)
890 this.channel = channel;
891 this.attachment = attachment;
899 SelectionKey key = channel.register(_selector, 0, attachment);
900 EndPoint endpoint = createEndPoint(channel, key);
901 key.attach(endpoint);
905 closeNoExceptions(channel);
911 private class Connect implements Runnable
913 private final AtomicBoolean failed = new AtomicBoolean();
914 private final SocketChannel channel;
915 private final Object attachment;
916 private final Scheduler.Task timeout;
918 private Connect(SocketChannel channel, Object attachment)
920 this.channel = channel;
921 this.attachment = attachment;
922 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
930 channel.register(_selector, SelectionKey.OP_CONNECT, this);
938 private void failed(Throwable failure)
940 if (failed.compareAndSet(false, true))
943 closeNoExceptions(channel);
944 connectionFailed(channel, failure, attachment);
949 private class ConnectTimeout implements Runnable
951 private final Connect connect;
953 private ConnectTimeout(Connect connect)
955 this.connect = connect;
961 SocketChannel channel = connect.channel;
962 if (channel.isConnectionPending())
964 if (LOG.isDebugEnabled())
965 LOG.debug("Channel {} timed out while connecting, closing it", channel);
966 connect.failed(new SocketTimeoutException("Connect Timeout"));
971 private class Stop implements Runnable
973 private final CountDownLatch latch = new CountDownLatch(1);
980 for (SelectionKey key : _selector.keys())
982 Object attachment = key.attachment();
983 if (attachment instanceof EndPoint)
985 EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
987 // We are closing the SelectorManager, so we want to block the
988 // selector thread here until we have closed all EndPoints.
989 // This is different than calling close() directly, because close()
990 // can wait forever, while here we are limited by the stop timeout.
991 closer.await(getStopTimeout());
995 closeNoExceptions(_selector);
1003 public boolean await(long timeout)
1007 return latch.await(timeout, TimeUnit.MILLISECONDS);
1009 catch (InterruptedException x)
1016 private class EndPointCloser implements Runnable
1018 private final CountDownLatch latch = new CountDownLatch(1);
1019 private final EndPoint endPoint;
1021 private EndPointCloser(EndPoint endPoint)
1023 this.endPoint = endPoint;
1031 closeNoExceptions(endPoint.getConnection());
1039 private boolean await(long timeout)
1043 return latch.await(timeout, TimeUnit.MILLISECONDS);
1045 catch (InterruptedException x)
1054 * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of
1055 * non-blocking events by the {@link ManagedSelector}.
1057 public interface SelectableEndPoint extends EndPoint
1060 * <p>Callback method invoked when a read or write events has been detected by the {@link ManagedSelector}
1061 * for this endpoint.</p>