]> WPIA git - gigi.git/blobdiff - lib/jetty/org/eclipse/jetty/io/SelectChannelEndPoint.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / io / SelectChannelEndPoint.java
diff --git a/lib/jetty/org/eclipse/jetty/io/SelectChannelEndPoint.java b/lib/jetty/org/eclipse/jetty/io/SelectChannelEndPoint.java
new file mode 100644 (file)
index 0000000..3050402
--- /dev/null
@@ -0,0 +1,207 @@
+//
+//  ========================================================================
+//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  ------------------------------------------------------------------------
+//  All rights reserved. This program and the accompanying materials
+//  are made available under the terms of the Eclipse Public License v1.0
+//  and Apache License v2.0 which accompanies this distribution.
+//
+//      The Eclipse Public License is available at
+//      http://www.eclipse.org/legal/epl-v10.html
+//
+//      The Apache License v2.0 is available at
+//      http://www.opensource.org/licenses/apache2.0.php
+//
+//  You may elect to redistribute this code under either of these licenses.
+//  ========================================================================
+//
+
+package org.eclipse.jetty.io;
+
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.Scheduler;
+
+/**
+ * An ChannelEndpoint that can be scheduled by {@link SelectorManager}.
+ */
+public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableEndPoint
+{
+    public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
+
+    private final Runnable _updateTask = new Runnable()
+    {
+        @Override
+        public void run()
+        {
+            try
+            {
+                if (getChannel().isOpen())
+                {
+                    int oldInterestOps = _key.interestOps();
+                    int newInterestOps = _interestOps.get();
+                    if (newInterestOps != oldInterestOps)
+                        setKeyInterests(oldInterestOps, newInterestOps);
+                }
+            }
+            catch (CancelledKeyException x)
+            {
+                LOG.debug("Ignoring key update for concurrently closed channel {}", this);
+                close();
+            }
+            catch (Exception x)
+            {
+                LOG.warn("Ignoring key update for " + this, x);
+                close();
+            }
+        }
+    };
+
+    /**
+     * true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called
+     */
+    private final AtomicBoolean _open = new AtomicBoolean();
+    private final SelectorManager.ManagedSelector _selector;
+    private final SelectionKey _key;
+    /**
+     * The desired value for {@link SelectionKey#interestOps()}
+     */
+    private final AtomicInteger _interestOps = new AtomicInteger();
+
+    public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
+    {
+        super(scheduler,channel);
+        _selector = selector;
+        _key = key;
+        setIdleTimeout(idleTimeout);
+    }
+
+    @Override
+    protected boolean needsFill()
+    {
+        updateLocalInterests(SelectionKey.OP_READ, true);
+        return false;
+    }
+
+    @Override
+    protected void onIncompleteFlush()
+    {
+        updateLocalInterests(SelectionKey.OP_WRITE, true);
+    }
+
+    @Override
+    public void onSelected()
+    {
+        assert _selector.isSelectorThread();
+        int oldInterestOps = _key.interestOps();
+        int readyOps = _key.readyOps();
+        int newInterestOps = oldInterestOps & ~readyOps;
+        setKeyInterests(oldInterestOps, newInterestOps);
+        updateLocalInterests(readyOps, false);
+        if (_key.isReadable())
+            getFillInterest().fillable();
+        if (_key.isWritable())
+            getWriteFlusher().completeWrite();
+    }
+
+
+    private void updateLocalInterests(int operation, boolean add)
+    {
+        while (true)
+        {
+            int oldInterestOps = _interestOps.get();
+            int newInterestOps;
+            if (add)
+                newInterestOps = oldInterestOps | operation;
+            else
+                newInterestOps = oldInterestOps & ~operation;
+
+            if (isInputShutdown())
+                newInterestOps &= ~SelectionKey.OP_READ;
+            if (isOutputShutdown())
+                newInterestOps &= ~SelectionKey.OP_WRITE;
+
+            if (newInterestOps != oldInterestOps)
+            {
+                if (_interestOps.compareAndSet(oldInterestOps, newInterestOps))
+                {
+                    LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
+                    _selector.updateKey(_updateTask);
+                }
+                else
+                {
+                    LOG.debug("Local interests update conflict: now {}, was {}, attempted {} for {}", _interestOps.get(), oldInterestOps, newInterestOps, this);
+                    continue;
+                }
+            }
+            else
+            {
+                LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
+            }
+            break;
+        }
+    }
+
+
+    private void setKeyInterests(int oldInterestOps, int newInterestOps)
+    {
+        LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps);
+        _key.interestOps(newInterestOps);
+    }
+
+    @Override
+    public void close()
+    {
+        if (_open.compareAndSet(true, false))
+        {
+            super.close();
+            _selector.destroyEndPoint(this);
+        }
+    }
+
+    @Override
+    public boolean isOpen()
+    {
+        // We cannot rely on super.isOpen(), because there is a race between calls to close() and isOpen():
+        // a thread may call close(), which flips the boolean but has not yet called super.close(), and
+        // another thread calls isOpen() which would return true - wrong - if based on super.isOpen().
+        return _open.get();
+    }
+
+    @Override
+    public void onOpen()
+    {
+        if (_open.compareAndSet(false, true))
+            super.onOpen();
+    }
+
+    @Override
+    public String toString()
+    {
+        // Do NOT use synchronized (this)
+        // because it's very easy to deadlock when debugging is enabled.
+        // We do a best effort to print the right toString() and that's it.
+        try
+        {
+            boolean valid = _key!=null && _key.isValid();
+            int keyInterests = valid ? _key.interestOps() : -1;
+            int keyReadiness = valid ? _key.readyOps() : -1;
+            return String.format("%s{io=%d,kio=%d,kro=%d}",
+                    super.toString(),
+                    _interestOps.get(),
+                    keyInterests,
+                    keyReadiness);
+        }
+        catch (CancelledKeyException x)
+        {
+            return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps.get());
+        }
+    }
+}