X-Git-Url: https://code.wpia.club/?p=gigi.git;a=blobdiff_plain;f=lib%2Fjetty%2Forg%2Feclipse%2Fjetty%2Fio%2FSelectChannelEndPoint.java;fp=lib%2Fjetty%2Forg%2Feclipse%2Fjetty%2Fio%2FSelectChannelEndPoint.java;h=30504029947bf3d54df33045eb3e7c04328cc99a;hp=0000000000000000000000000000000000000000;hb=73ef54a38e3930a1a789cdc6b5fa23cdd4c9d086;hpb=515007c7c1351045420669d65b59c08fa46850f2 diff --git a/lib/jetty/org/eclipse/jetty/io/SelectChannelEndPoint.java b/lib/jetty/org/eclipse/jetty/io/SelectChannelEndPoint.java new file mode 100644 index 00000000..30504029 --- /dev/null +++ b/lib/jetty/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -0,0 +1,207 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.nio.channels.CancelledKeyException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jetty.io.SelectorManager.ManagedSelector; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; + +/** + * An ChannelEndpoint that can be scheduled by {@link SelectorManager}. + */ +public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableEndPoint +{ + public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); + + private final Runnable _updateTask = new Runnable() + { + @Override + public void run() + { + try + { + if (getChannel().isOpen()) + { + int oldInterestOps = _key.interestOps(); + int newInterestOps = _interestOps.get(); + if (newInterestOps != oldInterestOps) + setKeyInterests(oldInterestOps, newInterestOps); + } + } + catch (CancelledKeyException x) + { + LOG.debug("Ignoring key update for concurrently closed channel {}", this); + close(); + } + catch (Exception x) + { + LOG.warn("Ignoring key update for " + this, x); + close(); + } + } + }; + + /** + * true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called + */ + private final AtomicBoolean _open = new AtomicBoolean(); + private final SelectorManager.ManagedSelector _selector; + private final SelectionKey _key; + /** + * The desired value for {@link SelectionKey#interestOps()} + */ + private final AtomicInteger _interestOps = new AtomicInteger(); + + public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout) + { + super(scheduler,channel); + _selector = selector; + _key = key; + setIdleTimeout(idleTimeout); + } + + @Override + protected boolean needsFill() + { + updateLocalInterests(SelectionKey.OP_READ, true); + return false; + } + + @Override + protected void onIncompleteFlush() + { + updateLocalInterests(SelectionKey.OP_WRITE, true); + } + + @Override + public void onSelected() + { + assert _selector.isSelectorThread(); + int oldInterestOps = _key.interestOps(); + int readyOps = _key.readyOps(); + int newInterestOps = oldInterestOps & ~readyOps; + setKeyInterests(oldInterestOps, newInterestOps); + updateLocalInterests(readyOps, false); + if (_key.isReadable()) + getFillInterest().fillable(); + if (_key.isWritable()) + getWriteFlusher().completeWrite(); + } + + + private void updateLocalInterests(int operation, boolean add) + { + while (true) + { + int oldInterestOps = _interestOps.get(); + int newInterestOps; + if (add) + newInterestOps = oldInterestOps | operation; + else + newInterestOps = oldInterestOps & ~operation; + + if (isInputShutdown()) + newInterestOps &= ~SelectionKey.OP_READ; + if (isOutputShutdown()) + newInterestOps &= ~SelectionKey.OP_WRITE; + + if (newInterestOps != oldInterestOps) + { + if (_interestOps.compareAndSet(oldInterestOps, newInterestOps)) + { + LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this); + _selector.updateKey(_updateTask); + } + else + { + LOG.debug("Local interests update conflict: now {}, was {}, attempted {} for {}", _interestOps.get(), oldInterestOps, newInterestOps, this); + continue; + } + } + else + { + LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this); + } + break; + } + } + + + private void setKeyInterests(int oldInterestOps, int newInterestOps) + { + LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps); + _key.interestOps(newInterestOps); + } + + @Override + public void close() + { + if (_open.compareAndSet(true, false)) + { + super.close(); + _selector.destroyEndPoint(this); + } + } + + @Override + public boolean isOpen() + { + // We cannot rely on super.isOpen(), because there is a race between calls to close() and isOpen(): + // a thread may call close(), which flips the boolean but has not yet called super.close(), and + // another thread calls isOpen() which would return true - wrong - if based on super.isOpen(). + return _open.get(); + } + + @Override + public void onOpen() + { + if (_open.compareAndSet(false, true)) + super.onOpen(); + } + + @Override + public String toString() + { + // Do NOT use synchronized (this) + // because it's very easy to deadlock when debugging is enabled. + // We do a best effort to print the right toString() and that's it. + try + { + boolean valid = _key!=null && _key.isValid(); + int keyInterests = valid ? _key.interestOps() : -1; + int keyReadiness = valid ? _key.readyOps() : -1; + return String.format("%s{io=%d,kio=%d,kro=%d}", + super.toString(), + _interestOps.get(), + keyInterests, + keyReadiness); + } + catch (CancelledKeyException x) + { + return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps.get()); + } + } +}