X-Git-Url: https://code.wpia.club/?p=gigi.git;a=blobdiff_plain;f=lib%2Fjetty%2Forg%2Feclipse%2Fjetty%2Fio%2FSelectorManager.java;h=26a18c3d3b34ef6bc7d63b43522c2d5d987cbe27;hp=fd3c6c6b9eedb0537203bc225be576c3fc036e5c;hb=ba4f228fa9f72d50991a2218cfd83987ef5d385e;hpb=875b5e9651498a0cd8e0001c0742ba843e47cad0 diff --git a/lib/jetty/org/eclipse/jetty/io/SelectorManager.java b/lib/jetty/org/eclipse/jetty/io/SelectorManager.java index fd3c6c6b..26a18c3d 100644 --- a/lib/jetty/org/eclipse/jetty/io/SelectorManager.java +++ b/lib/jetty/org/eclipse/jetty/io/SelectorManager.java @@ -1,6 +1,6 @@ // // ======================================================================== -// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. // ------------------------------------------------------------------------ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; @@ -60,14 +61,15 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates"; public static final int DEFAULT_CONNECT_TIMEOUT = 15000; protected static final Logger LOG = Log.getLogger(SelectorManager.class); - private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false")); - + private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "true")); + private final Executor executor; private final Scheduler scheduler; private final ManagedSelector[] _selectors; private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT; private long _selectorIndex; - + private int _priorityDelta; + protected SelectorManager(Executor executor, Scheduler scheduler) { this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2); @@ -112,6 +114,42 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa _connectTimeout = milliseconds; } + + @ManagedAttribute("The priority delta to apply to selector threads") + public int getSelectorPriorityDelta() + { + return _priorityDelta; + } + + /** + * Sets the selector thread priority delta to the given amount. + *

This allows the selector threads to run at a different priority. + * Typically this would be used to lower the priority to give preference + * to handling previously accepted connections rather than accepting + * new connections.

+ * + * @param selectorPriorityDelta the amount to change the thread priority + * delta to (may be negative) + * @see Thread#getPriority() + */ + public void setSelectorPriorityDelta(int selectorPriorityDelta) + { + int oldDelta = _priorityDelta; + _priorityDelta = selectorPriorityDelta; + if (oldDelta != selectorPriorityDelta && isStarted()) + { + for (ManagedSelector selector : _selectors) + { + Thread thread = selector._thread; + if (thread != null) + { + int deltaDiff = selectorPriorityDelta - oldDelta; + thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, thread.getPriority() - deltaDiff))); + } + } + } + } + /** * Executes the given task in a different thread. * @@ -142,11 +180,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa /** *

Registers a channel to perform a non-blocking connect.

- *

The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)} - * must be called prior to calling this method.

+ *

The channel must be set in non-blocking mode, {@link SocketChannel#connect(SocketAddress)} + * must be called prior to calling this method, and the connect operation must not be completed + * (the return value of {@link SocketChannel#connect(SocketAddress)} must be false).

* * @param channel the channel to register * @param attachment the attachment object + * @see #accept(SocketChannel, Object) */ public void connect(SocketChannel channel, Object attachment) { @@ -154,33 +194,44 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa set.submit(set.new Connect(channel, attachment)); } + /** + * @see #accept(SocketChannel, Object) + */ + public void accept(SocketChannel channel) + { + accept(channel, null); + } + /** *

Registers a channel to perform non-blocking read/write operations.

*

This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()}, - * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.

+ * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}, or + * just after a non-blocking connect via {@link SocketChannel#connect(SocketAddress)} that completed + * successfully.

* * @param channel the channel to register + * @param attachment the attachment object */ - public void accept(final SocketChannel channel) + public void accept(SocketChannel channel, Object attachment) { final ManagedSelector selector = chooseSelector(); - selector.submit(selector.new Accept(channel)); + selector.submit(selector.new Accept(channel, attachment)); } - + /** *

Registers a server channel for accept operations. * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel} * then the {@link #accepted(SocketChannel)} method is called, which must be * overridden by a derivation of this class to handle the accepted channel - * + * * @param server the server channel to register */ - public void acceptor(final ServerSocketChannel server) + public void acceptor(ServerSocketChannel server) { final ManagedSelector selector = chooseSelector(); selector.submit(selector.new Acceptor(server)); } - + /** * Callback method when a channel is accepted from the {@link ServerSocketChannel} * passed to {@link #acceptor(ServerSocketChannel)}. @@ -263,7 +314,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa if (isRunning()) LOG.warn("Exception while notifying connection " + connection, x); else - LOG.debug("Exception while notifying connection {}",connection, x); + LOG.debug("Exception while notifying connection " + connection, x); + throw x; } } @@ -377,17 +429,19 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa @Override protected void doStop() throws Exception { - LOG.debug("Stopping {}", this); + if (LOG.isDebugEnabled()) + LOG.debug("Stopping {}", this); Stop stop = new Stop(); submit(stop); stop.await(getStopTimeout()); - LOG.debug("Stopped {}", this); + if (LOG.isDebugEnabled()) + LOG.debug("Stopped {}", this); } /** * Submit a task to update a selector key. If the System property {@link SelectorManager#SUBMIT_KEY_UPDATES} - * is set true (default is false), the task is passed to {@link #submit(Runnable)}. Otherwise it is run immediately and the selector - * woken up if need be. + * is set true (default is false), the task is passed to {@link #submit(Runnable)}. Otherwise it is run immediately and the selector + * woken up if need be. * @param update the update to a key */ public void updateKey(Runnable update) @@ -398,12 +452,16 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } else { - runChange(update); + // Run only 1 change at once + synchronized (this) + { + runChange(update); + } if (_state.compareAndSet(State.SELECT, State.WAKEUP)) wakeup(); } } - + /** *

Submits a change to be executed in the selector thread.

*

Changes may be submitted from any thread, and the selector thread woken up @@ -419,7 +477,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa // change to the queue and process the state. _changes.offer(change); - LOG.debug("Queued change {}", change); + if (LOG.isDebugEnabled()) + LOG.debug("Queued change {}", change); out: while (true) { @@ -463,7 +522,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { try { - LOG.debug("Running change {}", change); + if (LOG.isDebugEnabled()) + LOG.debug("Running change {}", change); change.run(); } catch (Throwable x) @@ -477,18 +537,27 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { _thread = Thread.currentThread(); String name = _thread.getName(); + int priority = _thread.getPriority(); try { - _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id); - LOG.debug("Starting {} on {}", _thread, this); + if (_priorityDelta != 0) + _thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _priorityDelta))); + + _thread.setName(String.format("%s-selector-%s@%h/%d", name, SelectorManager.this.getClass().getSimpleName(), SelectorManager.this.hashCode(), _id)); + if (LOG.isDebugEnabled()) + LOG.debug("Starting {} on {}", _thread, this); while (isRunning()) select(); - runChanges(); + while(isStopping()) + runChanges(); } finally { - LOG.debug("Stopped {} on {}", _thread, this); + if (LOG.isDebugEnabled()) + LOG.debug("Stopped {} on {}", _thread, this); _thread.setName(name); + if (_priorityDelta != 0) + _thread.setPriority(priority); } } @@ -519,7 +588,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa _state.set(State.CHANGES); continue; default: - throw new IllegalStateException(); + throw new IllegalStateException(); } } // Must check first for SELECT and *then* for WAKEUP @@ -607,10 +676,16 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa boolean connected = finishConnect(channel); if (connected) { - connect.timeout.cancel(); - key.interestOps(0); - EndPoint endpoint = createEndPoint(channel, key); - key.attach(endpoint); + if (connect.timeout.cancel()) + { + key.interestOps(0); + EndPoint endpoint = createEndPoint(channel, key); + key.attach(endpoint); + } + else + { + throw new SocketTimeoutException("Concurrent Connect Timeout"); + } } else { @@ -622,7 +697,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa connect.failed(x); } } - + private void processAccept(SelectionKey key) { ServerSocketChannel server = (ServerSocketChannel)key.channel(); @@ -671,13 +746,15 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa Connection connection = newConnection(channel, endPoint, selectionKey.attachment()); endPoint.setConnection(connection); connectionOpened(connection); - LOG.debug("Created {}", endPoint); + if (LOG.isDebugEnabled()) + LOG.debug("Created {}", endPoint); return endPoint; } public void destroyEndPoint(EndPoint endPoint) { - LOG.debug("Destroyed {}", endPoint); + if (LOG.isDebugEnabled()) + LOG.debug("Destroyed {}", endPoint); Connection connection = endPoint.getConnection(); if (connection != null) connectionClosed(connection); @@ -792,7 +869,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa try { SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null); - LOG.debug("{} acceptor={}", this, key); + if (LOG.isDebugEnabled()) + LOG.debug("{} acceptor={}", this, key); } catch (Throwable x) { @@ -804,11 +882,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private class Accept implements Runnable { - private final SocketChannel _channel; + private final SocketChannel channel; + private final Object attachment; - public Accept(SocketChannel channel) + private Accept(SocketChannel channel, Object attachment) { - this._channel = channel; + this.channel = channel; + this.attachment = attachment; } @Override @@ -816,13 +896,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { try { - SelectionKey key = _channel.register(_selector, 0, null); - EndPoint endpoint = createEndPoint(_channel, key); + SelectionKey key = channel.register(_selector, 0, attachment); + EndPoint endpoint = createEndPoint(channel, key); key.attach(endpoint); } catch (Throwable x) { - closeNoExceptions(_channel); + closeNoExceptions(channel); LOG.debug(x); } } @@ -835,7 +915,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private final Object attachment; private final Scheduler.Task timeout; - public Connect(SocketChannel channel, Object attachment) + private Connect(SocketChannel channel, Object attachment) { this.channel = channel; this.attachment = attachment; @@ -855,7 +935,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } } - protected void failed(Throwable failure) + private void failed(Throwable failure) { if (failed.compareAndSet(false, true)) { @@ -881,8 +961,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa SocketChannel channel = connect.channel; if (channel.isConnectionPending()) { - LOG.debug("Channel {} timed out while connecting, closing it", channel); - connect.failed(new SocketTimeoutException()); + if (LOG.isDebugEnabled()) + LOG.debug("Channel {} timed out while connecting, closing it", channel); + connect.failed(new SocketTimeoutException("Connect Timeout")); } } }