X-Git-Url: https://code.wpia.club/?p=gigi.git;a=blobdiff_plain;f=lib%2Fjetty%2Forg%2Feclipse%2Fjetty%2Fio%2FSelectorManager.java;fp=lib%2Fjetty%2Forg%2Feclipse%2Fjetty%2Fio%2FSelectorManager.java;h=fd3c6c6b9eedb0537203bc225be576c3fc036e5c;hp=0000000000000000000000000000000000000000;hb=73ef54a38e3930a1a789cdc6b5fa23cdd4c9d086;hpb=515007c7c1351045420669d65b59c08fa46850f2 diff --git a/lib/jetty/org/eclipse/jetty/io/SelectorManager.java b/lib/jetty/org/eclipse/jetty/io/SelectorManager.java new file mode 100644 index 00000000..fd3c6c6b --- /dev/null +++ b/lib/jetty/org/eclipse/jetty/io/SelectorManager.java @@ -0,0 +1,984 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.io.Closeable; +import java.io.IOException; +import java.net.ConnectException; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.util.ConcurrentArrayQueue; +import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.NonBlockingThread; +import org.eclipse.jetty.util.thread.Scheduler; + +/** + *
{@link SelectorManager} manages a number of {@link ManagedSelector}s that + * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.
+ *{@link SelectorManager} subclasses implement methods to return protocol-specific + * {@link EndPoint}s and {@link Connection}s.
+ */ +public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable +{ + public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates"; + public static final int DEFAULT_CONNECT_TIMEOUT = 15000; + protected static final Logger LOG = Log.getLogger(SelectorManager.class); + private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false")); + + private final Executor executor; + private final Scheduler scheduler; + private final ManagedSelector[] _selectors; + private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private long _selectorIndex; + + protected SelectorManager(Executor executor, Scheduler scheduler) + { + this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2); + } + + protected SelectorManager(Executor executor, Scheduler scheduler, int selectors) + { + if (selectors<=0) + throw new IllegalArgumentException("No selectors"); + this.executor = executor; + this.scheduler = scheduler; + _selectors = new ManagedSelector[selectors]; + } + + public Executor getExecutor() + { + return executor; + } + + public Scheduler getScheduler() + { + return scheduler; + } + + /** + * Get the connect timeout + * + * @return the connect timeout (in milliseconds) + */ + public long getConnectTimeout() + { + return _connectTimeout; + } + + /** + * Set the connect timeout (in milliseconds) + * + * @param milliseconds the number of milliseconds for the timeout + */ + public void setConnectTimeout(long milliseconds) + { + _connectTimeout = milliseconds; + } + + /** + * Executes the given task in a different thread. + * + * @param task the task to execute + */ + protected void execute(Runnable task) + { + executor.execute(task); + } + + /** + * @return the number of selectors in use + */ + public int getSelectorCount() + { + return _selectors.length; + } + + private ManagedSelector chooseSelector() + { + // The ++ increment here is not atomic, but it does not matter, + // so long as the value changes sometimes, then connections will + // be distributed over the available selectors. + long s = _selectorIndex++; + int index = (int)(s % getSelectorCount()); + return _selectors[index]; + } + + /** + *Registers a channel to perform a non-blocking connect.
+ *The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)} + * must be called prior to calling this method.
+ * + * @param channel the channel to register + * @param attachment the attachment object + */ + public void connect(SocketChannel channel, Object attachment) + { + ManagedSelector set = chooseSelector(); + set.submit(set.new Connect(channel, attachment)); + } + + /** + *Registers a channel to perform non-blocking read/write operations.
+ *This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()}, + * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.
+ * + * @param channel the channel to register + */ + public void accept(final SocketChannel channel) + { + final ManagedSelector selector = chooseSelector(); + selector.submit(selector.new Accept(channel)); + } + + /** + *Registers a server channel for accept operations. + * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel} + * then the {@link #accepted(SocketChannel)} method is called, which must be + * overridden by a derivation of this class to handle the accepted channel + * + * @param server the server channel to register + */ + public void acceptor(final ServerSocketChannel server) + { + final ManagedSelector selector = chooseSelector(); + selector.submit(selector.new Acceptor(server)); + } + + /** + * Callback method when a channel is accepted from the {@link ServerSocketChannel} + * passed to {@link #acceptor(ServerSocketChannel)}. + * The default impl throws an {@link UnsupportedOperationException}, so it must + * be overridden by subclasses if a server channel is provided. + * + * @param channel the + * @throws IOException + */ + protected void accepted(SocketChannel channel) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + protected void doStart() throws Exception + { + super.doStart(); + for (int i = 0; i < _selectors.length; i++) + { + ManagedSelector selector = newSelector(i); + _selectors[i] = selector; + selector.start(); + execute(new NonBlockingThread(selector)); + } + } + + /** + *
Factory method for {@link ManagedSelector}.
+ * + * @param id an identifier for the {@link ManagedSelector to create} + * @return a new {@link ManagedSelector} + */ + protected ManagedSelector newSelector(int id) + { + return new ManagedSelector(id); + } + + @Override + protected void doStop() throws Exception + { + for (ManagedSelector selector : _selectors) + selector.stop(); + super.doStop(); + } + + /** + *Callback method invoked when an endpoint is opened.
+ * + * @param endpoint the endpoint being opened + */ + protected void endPointOpened(EndPoint endpoint) + { + endpoint.onOpen(); + } + + /** + *Callback method invoked when an endpoint is closed.
+ * + * @param endpoint the endpoint being closed + */ + protected void endPointClosed(EndPoint endpoint) + { + endpoint.onClose(); + } + + /** + *Callback method invoked when a connection is opened.
+ * + * @param connection the connection just opened + */ + public void connectionOpened(Connection connection) + { + try + { + connection.onOpen(); + } + catch (Throwable x) + { + if (isRunning()) + LOG.warn("Exception while notifying connection " + connection, x); + else + LOG.debug("Exception while notifying connection {}",connection, x); + } + } + + /** + *Callback method invoked when a connection is closed.
+ * + * @param connection the connection just closed + */ + public void connectionClosed(Connection connection) + { + try + { + connection.onClose(); + } + catch (Throwable x) + { + LOG.debug("Exception while notifying connection " + connection, x); + } + } + + protected boolean finishConnect(SocketChannel channel) throws IOException + { + return channel.finishConnect(); + } + + /** + *Callback method invoked when a non-blocking connect cannot be completed.
+ *By default it just logs with level warning.
+ * + * @param channel the channel that attempted the connect + * @param ex the exception that caused the connect to fail + * @param attachment the attachment object associated at registration + */ + protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment) + { + LOG.warn(String.format("%s - %s", channel, attachment), ex); + } + + /** + *Factory method to create {@link EndPoint}.
+ *This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)} + * or {@link #accept(SocketChannel)}.
+ * + * @param channel the channel associated to the endpoint + * @param selector the selector the channel is registered to + * @param selectionKey the selection key + * @return a new endpoint + * @throws IOException if the endPoint cannot be created + * @see #newConnection(SocketChannel, EndPoint, Object) + */ + protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException; + + /** + *Factory method to create {@link Connection}.
+ * + * @param channel the channel associated to the connection + * @param endpoint the endpoint + * @param attachment the attachment + * @return a new connection + * @throws IOException + * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey) + */ + public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException; + + @Override + public String dump() + { + return ContainerLifeCycle.dump(this); + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + ContainerLifeCycle.dumpObject(out, this); + ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors)); + } + + private enum State + { + CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS + } + + /** + *{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.
+ *{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events + * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated + * with the channel.
+ */ + public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable + { + private final AtomicReferenceSubmits a change to be executed in the selector thread.
+ *Changes may be submitted from any thread, and the selector thread woken up + * (if necessary) to execute the change.
+ * + * @param change the change to submit + */ + public void submit(Runnable change) + { + // This method may be called from the selector thread, and therefore + // we could directly run the change without queueing, but this may + // lead to stack overflows on a busy server, so we always offer the + // change to the queue and process the state. + + _changes.offer(change); + LOG.debug("Queued change {}", change); + + out: while (true) + { + switch (_state.get()) + { + case SELECT: + // Avoid multiple wakeup() calls if we the CAS fails + if (!_state.compareAndSet(State.SELECT, State.WAKEUP)) + continue; + wakeup(); + break out; + case CHANGES: + // Tell the selector thread that we have more changes. + // If we fail to CAS, we possibly need to wakeup(), so loop. + if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES)) + break out; + continue; + case WAKEUP: + // Do nothing, we have already a wakeup scheduled + break out; + case MORE_CHANGES: + // Do nothing, we already notified the selector thread of more changes + break out; + case PROCESS: + // Do nothing, the changes will be run after the processing + break out; + default: + throw new IllegalStateException(); + } + } + } + + private void runChanges() + { + Runnable change; + while ((change = _changes.poll()) != null) + runChange(change); + } + + protected void runChange(Runnable change) + { + try + { + LOG.debug("Running change {}", change); + change.run(); + } + catch (Throwable x) + { + LOG.debug("Could not run change " + change, x); + } + } + + @Override + public void run() + { + _thread = Thread.currentThread(); + String name = _thread.getName(); + try + { + _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id); + LOG.debug("Starting {} on {}", _thread, this); + while (isRunning()) + select(); + runChanges(); + } + finally + { + LOG.debug("Stopped {} on {}", _thread, this); + _thread.setName(name); + } + } + + /** + *Process changes and waits on {@link Selector#select()}.
+ * + * @see #submit(Runnable) + */ + public void select() + { + boolean debug = LOG.isDebugEnabled(); + try + { + _state.set(State.CHANGES); + + // Run the changes, and only exit if we ran all changes + out: while(true) + { + switch (_state.get()) + { + case CHANGES: + runChanges(); + if (_state.compareAndSet(State.CHANGES, State.SELECT)) + break out; + continue; + case MORE_CHANGES: + runChanges(); + _state.set(State.CHANGES); + continue; + default: + throw new IllegalStateException(); + } + } + // Must check first for SELECT and *then* for WAKEUP + // because we read the state twice in the assert, and + // it could change from SELECT to WAKEUP in between. + assert _state.get() == State.SELECT || _state.get() == State.WAKEUP; + + if (debug) + LOG.debug("Selector loop waiting on select"); + int selected = _selector.select(); + if (debug) + LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size()); + + _state.set(State.PROCESS); + + Set