--- /dev/null
+//
+// ========================================================================
+// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.io;
+
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.Scheduler;
+
+/**
+ * An ChannelEndpoint that can be scheduled by {@link SelectorManager}.
+ */
+public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableEndPoint
+{
+ public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
+
+ private final Runnable _updateTask = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (getChannel().isOpen())
+ {
+ int oldInterestOps = _key.interestOps();
+ int newInterestOps = _interestOps.get();
+ if (newInterestOps != oldInterestOps)
+ setKeyInterests(oldInterestOps, newInterestOps);
+ }
+ }
+ catch (CancelledKeyException x)
+ {
+ LOG.debug("Ignoring key update for concurrently closed channel {}", this);
+ close();
+ }
+ catch (Exception x)
+ {
+ LOG.warn("Ignoring key update for " + this, x);
+ close();
+ }
+ }
+ };
+
+ /**
+ * true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called
+ */
+ private final AtomicBoolean _open = new AtomicBoolean();
+ private final SelectorManager.ManagedSelector _selector;
+ private final SelectionKey _key;
+ /**
+ * The desired value for {@link SelectionKey#interestOps()}
+ */
+ private final AtomicInteger _interestOps = new AtomicInteger();
+
+ public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
+ {
+ super(scheduler,channel);
+ _selector = selector;
+ _key = key;
+ setIdleTimeout(idleTimeout);
+ }
+
+ @Override
+ protected boolean needsFill()
+ {
+ updateLocalInterests(SelectionKey.OP_READ, true);
+ return false;
+ }
+
+ @Override
+ protected void onIncompleteFlush()
+ {
+ updateLocalInterests(SelectionKey.OP_WRITE, true);
+ }
+
+ @Override
+ public void onSelected()
+ {
+ assert _selector.isSelectorThread();
+ int oldInterestOps = _key.interestOps();
+ int readyOps = _key.readyOps();
+ int newInterestOps = oldInterestOps & ~readyOps;
+ setKeyInterests(oldInterestOps, newInterestOps);
+ updateLocalInterests(readyOps, false);
+ if (_key.isReadable())
+ getFillInterest().fillable();
+ if (_key.isWritable())
+ getWriteFlusher().completeWrite();
+ }
+
+
+ private void updateLocalInterests(int operation, boolean add)
+ {
+ while (true)
+ {
+ int oldInterestOps = _interestOps.get();
+ int newInterestOps;
+ if (add)
+ newInterestOps = oldInterestOps | operation;
+ else
+ newInterestOps = oldInterestOps & ~operation;
+
+ if (isInputShutdown())
+ newInterestOps &= ~SelectionKey.OP_READ;
+ if (isOutputShutdown())
+ newInterestOps &= ~SelectionKey.OP_WRITE;
+
+ if (newInterestOps != oldInterestOps)
+ {
+ if (_interestOps.compareAndSet(oldInterestOps, newInterestOps))
+ {
+ LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
+ _selector.updateKey(_updateTask);
+ }
+ else
+ {
+ LOG.debug("Local interests update conflict: now {}, was {}, attempted {} for {}", _interestOps.get(), oldInterestOps, newInterestOps, this);
+ continue;
+ }
+ }
+ else
+ {
+ LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
+ }
+ break;
+ }
+ }
+
+
+ private void setKeyInterests(int oldInterestOps, int newInterestOps)
+ {
+ LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps);
+ _key.interestOps(newInterestOps);
+ }
+
+ @Override
+ public void close()
+ {
+ if (_open.compareAndSet(true, false))
+ {
+ super.close();
+ _selector.destroyEndPoint(this);
+ }
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ // We cannot rely on super.isOpen(), because there is a race between calls to close() and isOpen():
+ // a thread may call close(), which flips the boolean but has not yet called super.close(), and
+ // another thread calls isOpen() which would return true - wrong - if based on super.isOpen().
+ return _open.get();
+ }
+
+ @Override
+ public void onOpen()
+ {
+ if (_open.compareAndSet(false, true))
+ super.onOpen();
+ }
+
+ @Override
+ public String toString()
+ {
+ // Do NOT use synchronized (this)
+ // because it's very easy to deadlock when debugging is enabled.
+ // We do a best effort to print the right toString() and that's it.
+ try
+ {
+ boolean valid = _key!=null && _key.isValid();
+ int keyInterests = valid ? _key.interestOps() : -1;
+ int keyReadiness = valid ? _key.readyOps() : -1;
+ return String.format("%s{io=%d,kio=%d,kro=%d}",
+ super.toString(),
+ _interestOps.get(),
+ keyInterests,
+ keyReadiness);
+ }
+ catch (CancelledKeyException x)
+ {
+ return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps.get());
+ }
+ }
+}