X-Git-Url: https://code.wpia.club/?p=gigi.git;a=blobdiff_plain;f=lib%2Fjetty%2Forg%2Feclipse%2Fjetty%2Fio%2FSelectorManager.java;fp=lib%2Fjetty%2Forg%2Feclipse%2Fjetty%2Fio%2FSelectorManager.java;h=fd3c6c6b9eedb0537203bc225be576c3fc036e5c;hp=0000000000000000000000000000000000000000;hb=73ef54a38e3930a1a789cdc6b5fa23cdd4c9d086;hpb=515007c7c1351045420669d65b59c08fa46850f2 diff --git a/lib/jetty/org/eclipse/jetty/io/SelectorManager.java b/lib/jetty/org/eclipse/jetty/io/SelectorManager.java new file mode 100644 index 00000000..fd3c6c6b --- /dev/null +++ b/lib/jetty/org/eclipse/jetty/io/SelectorManager.java @@ -0,0 +1,984 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.io.Closeable; +import java.io.IOException; +import java.net.ConnectException; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.util.ConcurrentArrayQueue; +import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.NonBlockingThread; +import org.eclipse.jetty.util.thread.Scheduler; + +/** + *

{@link SelectorManager} manages a number of {@link ManagedSelector}s that + * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.

+ *

{@link SelectorManager} subclasses implement methods to return protocol-specific + * {@link EndPoint}s and {@link Connection}s.

+ */ +public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable +{ + public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates"; + public static final int DEFAULT_CONNECT_TIMEOUT = 15000; + protected static final Logger LOG = Log.getLogger(SelectorManager.class); + private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false")); + + private final Executor executor; + private final Scheduler scheduler; + private final ManagedSelector[] _selectors; + private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private long _selectorIndex; + + protected SelectorManager(Executor executor, Scheduler scheduler) + { + this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2); + } + + protected SelectorManager(Executor executor, Scheduler scheduler, int selectors) + { + if (selectors<=0) + throw new IllegalArgumentException("No selectors"); + this.executor = executor; + this.scheduler = scheduler; + _selectors = new ManagedSelector[selectors]; + } + + public Executor getExecutor() + { + return executor; + } + + public Scheduler getScheduler() + { + return scheduler; + } + + /** + * Get the connect timeout + * + * @return the connect timeout (in milliseconds) + */ + public long getConnectTimeout() + { + return _connectTimeout; + } + + /** + * Set the connect timeout (in milliseconds) + * + * @param milliseconds the number of milliseconds for the timeout + */ + public void setConnectTimeout(long milliseconds) + { + _connectTimeout = milliseconds; + } + + /** + * Executes the given task in a different thread. + * + * @param task the task to execute + */ + protected void execute(Runnable task) + { + executor.execute(task); + } + + /** + * @return the number of selectors in use + */ + public int getSelectorCount() + { + return _selectors.length; + } + + private ManagedSelector chooseSelector() + { + // The ++ increment here is not atomic, but it does not matter, + // so long as the value changes sometimes, then connections will + // be distributed over the available selectors. + long s = _selectorIndex++; + int index = (int)(s % getSelectorCount()); + return _selectors[index]; + } + + /** + *

Registers a channel to perform a non-blocking connect.

+ *

The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)} + * must be called prior to calling this method.

+ * + * @param channel the channel to register + * @param attachment the attachment object + */ + public void connect(SocketChannel channel, Object attachment) + { + ManagedSelector set = chooseSelector(); + set.submit(set.new Connect(channel, attachment)); + } + + /** + *

Registers a channel to perform non-blocking read/write operations.

+ *

This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()}, + * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.

+ * + * @param channel the channel to register + */ + public void accept(final SocketChannel channel) + { + final ManagedSelector selector = chooseSelector(); + selector.submit(selector.new Accept(channel)); + } + + /** + *

Registers a server channel for accept operations. + * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel} + * then the {@link #accepted(SocketChannel)} method is called, which must be + * overridden by a derivation of this class to handle the accepted channel + * + * @param server the server channel to register + */ + public void acceptor(final ServerSocketChannel server) + { + final ManagedSelector selector = chooseSelector(); + selector.submit(selector.new Acceptor(server)); + } + + /** + * Callback method when a channel is accepted from the {@link ServerSocketChannel} + * passed to {@link #acceptor(ServerSocketChannel)}. + * The default impl throws an {@link UnsupportedOperationException}, so it must + * be overridden by subclasses if a server channel is provided. + * + * @param channel the + * @throws IOException + */ + protected void accepted(SocketChannel channel) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + protected void doStart() throws Exception + { + super.doStart(); + for (int i = 0; i < _selectors.length; i++) + { + ManagedSelector selector = newSelector(i); + _selectors[i] = selector; + selector.start(); + execute(new NonBlockingThread(selector)); + } + } + + /** + *

Factory method for {@link ManagedSelector}.

+ * + * @param id an identifier for the {@link ManagedSelector to create} + * @return a new {@link ManagedSelector} + */ + protected ManagedSelector newSelector(int id) + { + return new ManagedSelector(id); + } + + @Override + protected void doStop() throws Exception + { + for (ManagedSelector selector : _selectors) + selector.stop(); + super.doStop(); + } + + /** + *

Callback method invoked when an endpoint is opened.

+ * + * @param endpoint the endpoint being opened + */ + protected void endPointOpened(EndPoint endpoint) + { + endpoint.onOpen(); + } + + /** + *

Callback method invoked when an endpoint is closed.

+ * + * @param endpoint the endpoint being closed + */ + protected void endPointClosed(EndPoint endpoint) + { + endpoint.onClose(); + } + + /** + *

Callback method invoked when a connection is opened.

+ * + * @param connection the connection just opened + */ + public void connectionOpened(Connection connection) + { + try + { + connection.onOpen(); + } + catch (Throwable x) + { + if (isRunning()) + LOG.warn("Exception while notifying connection " + connection, x); + else + LOG.debug("Exception while notifying connection {}",connection, x); + } + } + + /** + *

Callback method invoked when a connection is closed.

+ * + * @param connection the connection just closed + */ + public void connectionClosed(Connection connection) + { + try + { + connection.onClose(); + } + catch (Throwable x) + { + LOG.debug("Exception while notifying connection " + connection, x); + } + } + + protected boolean finishConnect(SocketChannel channel) throws IOException + { + return channel.finishConnect(); + } + + /** + *

Callback method invoked when a non-blocking connect cannot be completed.

+ *

By default it just logs with level warning.

+ * + * @param channel the channel that attempted the connect + * @param ex the exception that caused the connect to fail + * @param attachment the attachment object associated at registration + */ + protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment) + { + LOG.warn(String.format("%s - %s", channel, attachment), ex); + } + + /** + *

Factory method to create {@link EndPoint}.

+ *

This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)} + * or {@link #accept(SocketChannel)}.

+ * + * @param channel the channel associated to the endpoint + * @param selector the selector the channel is registered to + * @param selectionKey the selection key + * @return a new endpoint + * @throws IOException if the endPoint cannot be created + * @see #newConnection(SocketChannel, EndPoint, Object) + */ + protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException; + + /** + *

Factory method to create {@link Connection}.

+ * + * @param channel the channel associated to the connection + * @param endpoint the endpoint + * @param attachment the attachment + * @return a new connection + * @throws IOException + * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey) + */ + public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException; + + @Override + public String dump() + { + return ContainerLifeCycle.dump(this); + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + ContainerLifeCycle.dumpObject(out, this); + ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors)); + } + + private enum State + { + CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS + } + + /** + *

{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.

+ *

{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events + * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated + * with the channel.

+ */ + public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable + { + private final AtomicReference _state= new AtomicReference<>(State.PROCESS); + private final Queue _changes = new ConcurrentArrayQueue<>(); + private final int _id; + private Selector _selector; + private volatile Thread _thread; + + public ManagedSelector(int id) + { + _id = id; + setStopTimeout(5000); + } + + @Override + protected void doStart() throws Exception + { + super.doStart(); + _selector = Selector.open(); + _state.set(State.PROCESS); + } + + @Override + protected void doStop() throws Exception + { + LOG.debug("Stopping {}", this); + Stop stop = new Stop(); + submit(stop); + stop.await(getStopTimeout()); + LOG.debug("Stopped {}", this); + } + + /** + * Submit a task to update a selector key. If the System property {@link SelectorManager#SUBMIT_KEY_UPDATES} + * is set true (default is false), the task is passed to {@link #submit(Runnable)}. Otherwise it is run immediately and the selector + * woken up if need be. + * @param update the update to a key + */ + public void updateKey(Runnable update) + { + if (__submitKeyUpdates) + { + submit(update); + } + else + { + runChange(update); + if (_state.compareAndSet(State.SELECT, State.WAKEUP)) + wakeup(); + } + } + + /** + *

Submits a change to be executed in the selector thread.

+ *

Changes may be submitted from any thread, and the selector thread woken up + * (if necessary) to execute the change.

+ * + * @param change the change to submit + */ + public void submit(Runnable change) + { + // This method may be called from the selector thread, and therefore + // we could directly run the change without queueing, but this may + // lead to stack overflows on a busy server, so we always offer the + // change to the queue and process the state. + + _changes.offer(change); + LOG.debug("Queued change {}", change); + + out: while (true) + { + switch (_state.get()) + { + case SELECT: + // Avoid multiple wakeup() calls if we the CAS fails + if (!_state.compareAndSet(State.SELECT, State.WAKEUP)) + continue; + wakeup(); + break out; + case CHANGES: + // Tell the selector thread that we have more changes. + // If we fail to CAS, we possibly need to wakeup(), so loop. + if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES)) + break out; + continue; + case WAKEUP: + // Do nothing, we have already a wakeup scheduled + break out; + case MORE_CHANGES: + // Do nothing, we already notified the selector thread of more changes + break out; + case PROCESS: + // Do nothing, the changes will be run after the processing + break out; + default: + throw new IllegalStateException(); + } + } + } + + private void runChanges() + { + Runnable change; + while ((change = _changes.poll()) != null) + runChange(change); + } + + protected void runChange(Runnable change) + { + try + { + LOG.debug("Running change {}", change); + change.run(); + } + catch (Throwable x) + { + LOG.debug("Could not run change " + change, x); + } + } + + @Override + public void run() + { + _thread = Thread.currentThread(); + String name = _thread.getName(); + try + { + _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id); + LOG.debug("Starting {} on {}", _thread, this); + while (isRunning()) + select(); + runChanges(); + } + finally + { + LOG.debug("Stopped {} on {}", _thread, this); + _thread.setName(name); + } + } + + /** + *

Process changes and waits on {@link Selector#select()}.

+ * + * @see #submit(Runnable) + */ + public void select() + { + boolean debug = LOG.isDebugEnabled(); + try + { + _state.set(State.CHANGES); + + // Run the changes, and only exit if we ran all changes + out: while(true) + { + switch (_state.get()) + { + case CHANGES: + runChanges(); + if (_state.compareAndSet(State.CHANGES, State.SELECT)) + break out; + continue; + case MORE_CHANGES: + runChanges(); + _state.set(State.CHANGES); + continue; + default: + throw new IllegalStateException(); + } + } + // Must check first for SELECT and *then* for WAKEUP + // because we read the state twice in the assert, and + // it could change from SELECT to WAKEUP in between. + assert _state.get() == State.SELECT || _state.get() == State.WAKEUP; + + if (debug) + LOG.debug("Selector loop waiting on select"); + int selected = _selector.select(); + if (debug) + LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size()); + + _state.set(State.PROCESS); + + Set selectedKeys = _selector.selectedKeys(); + for (SelectionKey key : selectedKeys) + { + if (key.isValid()) + { + processKey(key); + } + else + { + if (debug) + LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel()); + Object attachment = key.attachment(); + if (attachment instanceof EndPoint) + ((EndPoint)attachment).close(); + } + } + selectedKeys.clear(); + } + catch (Throwable x) + { + if (isRunning()) + LOG.warn(x); + else + LOG.ignore(x); + } + } + + private void processKey(SelectionKey key) + { + Object attachment = key.attachment(); + try + { + if (attachment instanceof SelectableEndPoint) + { + ((SelectableEndPoint)attachment).onSelected(); + } + else if (key.isConnectable()) + { + processConnect(key, (Connect)attachment); + } + else if (key.isAcceptable()) + { + processAccept(key); + } + else + { + throw new IllegalStateException(); + } + } + catch (CancelledKeyException x) + { + LOG.debug("Ignoring cancelled key for channel {}", key.channel()); + if (attachment instanceof EndPoint) + closeNoExceptions((EndPoint)attachment); + } + catch (Throwable x) + { + LOG.warn("Could not process key for channel " + key.channel(), x); + if (attachment instanceof EndPoint) + closeNoExceptions((EndPoint)attachment); + } + } + + private void processConnect(SelectionKey key, Connect connect) + { + SocketChannel channel = (SocketChannel)key.channel(); + try + { + key.attach(connect.attachment); + boolean connected = finishConnect(channel); + if (connected) + { + connect.timeout.cancel(); + key.interestOps(0); + EndPoint endpoint = createEndPoint(channel, key); + key.attach(endpoint); + } + else + { + throw new ConnectException(); + } + } + catch (Throwable x) + { + connect.failed(x); + } + } + + private void processAccept(SelectionKey key) + { + ServerSocketChannel server = (ServerSocketChannel)key.channel(); + SocketChannel channel = null; + try + { + while ((channel = server.accept()) != null) + { + accepted(channel); + } + } + catch (Throwable x) + { + closeNoExceptions(channel); + LOG.warn("Accept failed for channel " + channel, x); + } + } + + private void closeNoExceptions(Closeable closeable) + { + try + { + if (closeable != null) + closeable.close(); + } + catch (Throwable x) + { + LOG.ignore(x); + } + } + + public void wakeup() + { + _selector.wakeup(); + } + + public boolean isSelectorThread() + { + return Thread.currentThread() == _thread; + } + + private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException + { + EndPoint endPoint = newEndPoint(channel, this, selectionKey); + endPointOpened(endPoint); + Connection connection = newConnection(channel, endPoint, selectionKey.attachment()); + endPoint.setConnection(connection); + connectionOpened(connection); + LOG.debug("Created {}", endPoint); + return endPoint; + } + + public void destroyEndPoint(EndPoint endPoint) + { + LOG.debug("Destroyed {}", endPoint); + Connection connection = endPoint.getConnection(); + if (connection != null) + connectionClosed(connection); + endPointClosed(endPoint); + } + + @Override + public String dump() + { + return ContainerLifeCycle.dump(this); + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n"); + + Thread selecting = _thread; + + Object where = "not selecting"; + StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace(); + if (trace != null) + { + for (StackTraceElement t : trace) + if (t.getClassName().startsWith("org.eclipse.jetty.")) + { + where = t; + break; + } + } + + Selector selector = _selector; + if (selector != null && selector.isOpen()) + { + final ArrayList dump = new ArrayList<>(selector.keys().size() * 2); + dump.add(where); + + DumpKeys dumpKeys = new DumpKeys(dump); + submit(dumpKeys); + dumpKeys.await(5, TimeUnit.SECONDS); + + ContainerLifeCycle.dump(out, indent, dump); + } + } + + public void dumpKeysState(List dumps) + { + Selector selector = _selector; + Set keys = selector.keys(); + dumps.add(selector + " keys=" + keys.size()); + for (SelectionKey key : keys) + { + if (key.isValid()) + dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps()); + else + dumps.add(key.attachment() + " iOps=-1 rOps=-1"); + } + } + + @Override + public String toString() + { + Selector selector = _selector; + return String.format("%s keys=%d selected=%d", + super.toString(), + selector != null && selector.isOpen() ? selector.keys().size() : -1, + selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); + } + + private class DumpKeys implements Runnable + { + private final CountDownLatch latch = new CountDownLatch(1); + private final List _dumps; + + private DumpKeys(List dumps) + { + this._dumps = dumps; + } + + @Override + public void run() + { + dumpKeysState(_dumps); + latch.countDown(); + } + + public boolean await(long timeout, TimeUnit unit) + { + try + { + return latch.await(timeout, unit); + } + catch (InterruptedException x) + { + return false; + } + } + } + + private class Acceptor implements Runnable + { + private final ServerSocketChannel _channel; + + public Acceptor(ServerSocketChannel channel) + { + this._channel = channel; + } + + @Override + public void run() + { + try + { + SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null); + LOG.debug("{} acceptor={}", this, key); + } + catch (Throwable x) + { + closeNoExceptions(_channel); + LOG.warn(x); + } + } + } + + private class Accept implements Runnable + { + private final SocketChannel _channel; + + public Accept(SocketChannel channel) + { + this._channel = channel; + } + + @Override + public void run() + { + try + { + SelectionKey key = _channel.register(_selector, 0, null); + EndPoint endpoint = createEndPoint(_channel, key); + key.attach(endpoint); + } + catch (Throwable x) + { + closeNoExceptions(_channel); + LOG.debug(x); + } + } + } + + private class Connect implements Runnable + { + private final AtomicBoolean failed = new AtomicBoolean(); + private final SocketChannel channel; + private final Object attachment; + private final Scheduler.Task timeout; + + public Connect(SocketChannel channel, Object attachment) + { + this.channel = channel; + this.attachment = attachment; + this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS); + } + + @Override + public void run() + { + try + { + channel.register(_selector, SelectionKey.OP_CONNECT, this); + } + catch (Throwable x) + { + failed(x); + } + } + + protected void failed(Throwable failure) + { + if (failed.compareAndSet(false, true)) + { + timeout.cancel(); + closeNoExceptions(channel); + connectionFailed(channel, failure, attachment); + } + } + } + + private class ConnectTimeout implements Runnable + { + private final Connect connect; + + private ConnectTimeout(Connect connect) + { + this.connect = connect; + } + + @Override + public void run() + { + SocketChannel channel = connect.channel; + if (channel.isConnectionPending()) + { + LOG.debug("Channel {} timed out while connecting, closing it", channel); + connect.failed(new SocketTimeoutException()); + } + } + } + + private class Stop implements Runnable + { + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public void run() + { + try + { + for (SelectionKey key : _selector.keys()) + { + Object attachment = key.attachment(); + if (attachment instanceof EndPoint) + { + EndPointCloser closer = new EndPointCloser((EndPoint)attachment); + execute(closer); + // We are closing the SelectorManager, so we want to block the + // selector thread here until we have closed all EndPoints. + // This is different than calling close() directly, because close() + // can wait forever, while here we are limited by the stop timeout. + closer.await(getStopTimeout()); + } + } + + closeNoExceptions(_selector); + } + finally + { + latch.countDown(); + } + } + + public boolean await(long timeout) + { + try + { + return latch.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException x) + { + return false; + } + } + } + + private class EndPointCloser implements Runnable + { + private final CountDownLatch latch = new CountDownLatch(1); + private final EndPoint endPoint; + + private EndPointCloser(EndPoint endPoint) + { + this.endPoint = endPoint; + } + + @Override + public void run() + { + try + { + closeNoExceptions(endPoint.getConnection()); + } + finally + { + latch.countDown(); + } + } + + private boolean await(long timeout) + { + try + { + return latch.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException x) + { + return false; + } + } + } + } + + /** + * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of + * non-blocking events by the {@link ManagedSelector}. + */ + public interface SelectableEndPoint extends EndPoint + { + /** + *

Callback method invoked when a read or write events has been detected by the {@link ManagedSelector} + * for this endpoint.

+ */ + void onSelected(); + } +}