]> WPIA git - gigi.git/blobdiff - lib/jetty/org/eclipse/jetty/io/SelectorManager.java
updating jetty to jetty-9.2.16.v2016040
[gigi.git] / lib / jetty / org / eclipse / jetty / io / SelectorManager.java
index fd3c6c6b9eedb0537203bc225be576c3fc036e5c..26a18c3d3b34ef6bc7d63b43522c2d5d987cbe27 100644 (file)
@@ -1,6 +1,6 @@
 //
 //  ========================================================================
-//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
 //  ------------------------------------------------------------------------
 //  All rights reserved. This program and the accompanying materials
 //  are made available under the terms of the Eclipse Public License v1.0
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.eclipse.jetty.util.ConcurrentArrayQueue;
 import org.eclipse.jetty.util.TypeUtil;
+import org.eclipse.jetty.util.annotation.ManagedAttribute;
 import org.eclipse.jetty.util.component.AbstractLifeCycle;
 import org.eclipse.jetty.util.component.ContainerLifeCycle;
 import org.eclipse.jetty.util.component.Dumpable;
@@ -60,14 +61,15 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
     public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates";
     public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
     protected static final Logger LOG = Log.getLogger(SelectorManager.class);
-    private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false"));
-    
+    private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "true"));
+
     private final Executor executor;
     private final Scheduler scheduler;
     private final ManagedSelector[] _selectors;
     private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
     private long _selectorIndex;
-    
+    private int _priorityDelta;
+
     protected SelectorManager(Executor executor, Scheduler scheduler)
     {
         this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
@@ -112,6 +114,42 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
         _connectTimeout = milliseconds;
     }
 
+
+    @ManagedAttribute("The priority delta to apply to selector threads")
+    public int getSelectorPriorityDelta()
+    {
+        return _priorityDelta;
+    }
+
+    /**
+     * Sets the selector thread priority delta to the given amount.
+     * <p>This allows the selector threads to run at a different priority.
+     * Typically this would be used to lower the priority to give preference
+     * to handling previously accepted connections rather than accepting
+     * new connections.</p>
+     *
+     * @param selectorPriorityDelta the amount to change the thread priority
+     *                              delta to (may be negative)
+     * @see Thread#getPriority()
+     */
+    public void setSelectorPriorityDelta(int selectorPriorityDelta)
+    {
+        int oldDelta = _priorityDelta;
+        _priorityDelta = selectorPriorityDelta;
+        if (oldDelta != selectorPriorityDelta && isStarted())
+        {
+            for (ManagedSelector selector : _selectors)
+            {
+                Thread thread = selector._thread;
+                if (thread != null)
+                {
+                    int deltaDiff = selectorPriorityDelta - oldDelta;
+                    thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, thread.getPriority() - deltaDiff)));
+                }
+            }
+        }
+    }
+
     /**
      * Executes the given task in a different thread.
      *
@@ -142,11 +180,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
 
     /**
      * <p>Registers a channel to perform a non-blocking connect.</p>
-     * <p>The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)}
-     * must be called prior to calling this method.</p>
+     * <p>The channel must be set in non-blocking mode, {@link SocketChannel#connect(SocketAddress)}
+     * must be called prior to calling this method, and the connect operation must not be completed
+     * (the return value of {@link SocketChannel#connect(SocketAddress)} must be false).</p>
      *
      * @param channel    the channel to register
      * @param attachment the attachment object
+     * @see #accept(SocketChannel, Object)
      */
     public void connect(SocketChannel channel, Object attachment)
     {
@@ -154,33 +194,44 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
         set.submit(set.new Connect(channel, attachment));
     }
 
+    /**
+     * @see #accept(SocketChannel, Object)
+     */
+    public void accept(SocketChannel channel)
+    {
+        accept(channel, null);
+    }
+
     /**
      * <p>Registers a channel to perform non-blocking read/write operations.</p>
      * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
-     * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.</p>
+     * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}, or
+     * just after a non-blocking connect via {@link SocketChannel#connect(SocketAddress)} that completed
+     * successfully.</p>
      *
      * @param channel the channel to register
+     * @param attachment the attachment object
      */
-    public void accept(final SocketChannel channel)
+    public void accept(SocketChannel channel, Object attachment)
     {
         final ManagedSelector selector = chooseSelector();
-        selector.submit(selector.new Accept(channel));
+        selector.submit(selector.new Accept(channel, attachment));
     }
-    
+
     /**
      * <p>Registers a server channel for accept operations.
      * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel}
      * then the {@link #accepted(SocketChannel)} method is called, which must be
      * overridden by a derivation of this class to handle the accepted channel
-     * 
+     *
      * @param server the server channel to register
      */
-    public void acceptor(final ServerSocketChannel server)
+    public void acceptor(ServerSocketChannel server)
     {
         final ManagedSelector selector = chooseSelector();
         selector.submit(selector.new Acceptor(server));
     }
-    
+
     /**
      * Callback method when a channel is accepted from the {@link ServerSocketChannel}
      * passed to {@link #acceptor(ServerSocketChannel)}.
@@ -263,7 +314,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
             if (isRunning())
                 LOG.warn("Exception while notifying connection " + connection, x);
             else
-                LOG.debug("Exception while notifying connection {}",connection, x);
+                LOG.debug("Exception while notifying connection " + connection, x);
+            throw x;
         }
     }
 
@@ -377,17 +429,19 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
         @Override
         protected void doStop() throws Exception
         {
-            LOG.debug("Stopping {}", this);
+            if (LOG.isDebugEnabled())
+                LOG.debug("Stopping {}", this);
             Stop stop = new Stop();
             submit(stop);
             stop.await(getStopTimeout());
-            LOG.debug("Stopped {}", this);
+            if (LOG.isDebugEnabled())
+                LOG.debug("Stopped {}", this);
         }
 
         /**
          * Submit a task to update a selector key.  If the System property {@link SelectorManager#SUBMIT_KEY_UPDATES}
-         * is set true (default is false), the task is passed to {@link #submit(Runnable)}.   Otherwise it is run immediately and the selector 
-         * woken up if need be.   
+         * is set true (default is false), the task is passed to {@link #submit(Runnable)}.   Otherwise it is run immediately and the selector
+         * woken up if need be.
          * @param update the update to a key
          */
         public void updateKey(Runnable update)
@@ -398,12 +452,16 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
             }
             else
             {
-                runChange(update);
+                // Run only 1 change at once
+                synchronized (this)
+                {
+                    runChange(update);
+                }
                 if (_state.compareAndSet(State.SELECT, State.WAKEUP))
                    wakeup();
             }
         }
-        
+
         /**
          * <p>Submits a change to be executed in the selector thread.</p>
          * <p>Changes may be submitted from any thread, and the selector thread woken up
@@ -419,7 +477,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
             // change to the queue and process the state.
 
             _changes.offer(change);
-            LOG.debug("Queued change {}", change);
+            if (LOG.isDebugEnabled())
+                LOG.debug("Queued change {}", change);
 
             out: while (true)
             {
@@ -463,7 +522,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
         {
             try
             {
-                LOG.debug("Running change {}", change);
+                if (LOG.isDebugEnabled())
+                    LOG.debug("Running change {}", change);
                 change.run();
             }
             catch (Throwable x)
@@ -477,18 +537,27 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
         {
             _thread = Thread.currentThread();
             String name = _thread.getName();
+            int priority = _thread.getPriority();
             try
             {
-                _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id);
-                LOG.debug("Starting {} on {}", _thread, this);
+                if (_priorityDelta != 0)
+                    _thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _priorityDelta)));
+
+                _thread.setName(String.format("%s-selector-%s@%h/%d", name, SelectorManager.this.getClass().getSimpleName(), SelectorManager.this.hashCode(), _id));
+                if (LOG.isDebugEnabled())
+                    LOG.debug("Starting {} on {}", _thread, this);
                 while (isRunning())
                     select();
-                runChanges();
+                while(isStopping())
+                    runChanges();
             }
             finally
             {
-                LOG.debug("Stopped {} on {}", _thread, this);
+                if (LOG.isDebugEnabled())
+                    LOG.debug("Stopped {} on {}", _thread, this);
                 _thread.setName(name);
+                if (_priorityDelta != 0)
+                    _thread.setPriority(priority);
             }
         }
 
@@ -519,7 +588,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
                             _state.set(State.CHANGES);
                             continue;
                         default:
-                            throw new IllegalStateException();    
+                            throw new IllegalStateException();
                     }
                 }
                 // Must check first for SELECT and *then* for WAKEUP
@@ -607,10 +676,16 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
                 boolean connected = finishConnect(channel);
                 if (connected)
                 {
-                    connect.timeout.cancel();
-                    key.interestOps(0);
-                    EndPoint endpoint = createEndPoint(channel, key);
-                    key.attach(endpoint);
+                    if (connect.timeout.cancel())
+                    {
+                        key.interestOps(0);
+                        EndPoint endpoint = createEndPoint(channel, key);
+                        key.attach(endpoint);
+                    }
+                    else
+                    {
+                        throw new SocketTimeoutException("Concurrent Connect Timeout");
+                    }
                 }
                 else
                 {
@@ -622,7 +697,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
                 connect.failed(x);
             }
         }
-        
+
         private void processAccept(SelectionKey key)
         {
             ServerSocketChannel server = (ServerSocketChannel)key.channel();
@@ -671,13 +746,15 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
             Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
             endPoint.setConnection(connection);
             connectionOpened(connection);
-            LOG.debug("Created {}", endPoint);
+            if (LOG.isDebugEnabled())
+                LOG.debug("Created {}", endPoint);
             return endPoint;
         }
 
         public void destroyEndPoint(EndPoint endPoint)
         {
-            LOG.debug("Destroyed {}", endPoint);
+            if (LOG.isDebugEnabled())
+                LOG.debug("Destroyed {}", endPoint);
             Connection connection = endPoint.getConnection();
             if (connection != null)
                 connectionClosed(connection);
@@ -792,7 +869,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
                 try
                 {
                     SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
-                    LOG.debug("{} acceptor={}", this, key);
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("{} acceptor={}", this, key);
                 }
                 catch (Throwable x)
                 {
@@ -804,11 +882,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
 
         private class Accept implements Runnable
         {
-            private final SocketChannel _channel;
+            private final SocketChannel channel;
+            private final Object attachment;
 
-            public Accept(SocketChannel channel)
+            private Accept(SocketChannel channel, Object attachment)
             {
-                this._channel = channel;
+                this.channel = channel;
+                this.attachment = attachment;
             }
 
             @Override
@@ -816,13 +896,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
             {
                 try
                 {
-                    SelectionKey key = _channel.register(_selector, 0, null);
-                    EndPoint endpoint = createEndPoint(_channel, key);
+                    SelectionKey key = channel.register(_selector, 0, attachment);
+                    EndPoint endpoint = createEndPoint(channel, key);
                     key.attach(endpoint);
                 }
                 catch (Throwable x)
                 {
-                    closeNoExceptions(_channel);
+                    closeNoExceptions(channel);
                     LOG.debug(x);
                 }
             }
@@ -835,7 +915,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
             private final Object attachment;
             private final Scheduler.Task timeout;
 
-            public Connect(SocketChannel channel, Object attachment)
+            private Connect(SocketChannel channel, Object attachment)
             {
                 this.channel = channel;
                 this.attachment = attachment;
@@ -855,7 +935,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
                 }
             }
 
-            protected void failed(Throwable failure)
+            private void failed(Throwable failure)
             {
                 if (failed.compareAndSet(false, true))
                 {
@@ -881,8 +961,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
                 SocketChannel channel = connect.channel;
                 if (channel.isConnectionPending())
                 {
-                    LOG.debug("Channel {} timed out while connecting, closing it", channel);
-                    connect.failed(new SocketTimeoutException());
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("Channel {} timed out while connecting, closing it", channel);
+                    connect.failed(new SocketTimeoutException("Connect Timeout"));
                 }
             }
         }