2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.io;
21 import java.nio.channels.CancelledKeyException;
22 import java.nio.channels.SelectionKey;
23 import java.nio.channels.SocketChannel;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicInteger;
27 import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.util.thread.Scheduler;
33 * An ChannelEndpoint that can be scheduled by {@link SelectorManager}.
35 public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorManager.SelectableEndPoint
37 public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
39 private final Runnable _updateTask = new Runnable()
46 if (getChannel().isOpen())
48 int oldInterestOps = _key.interestOps();
49 int newInterestOps = _interestOps.get();
50 if (newInterestOps != oldInterestOps)
51 setKeyInterests(oldInterestOps, newInterestOps);
54 catch (CancelledKeyException x)
56 LOG.debug("Ignoring key update for concurrently closed channel {}", this);
61 LOG.warn("Ignoring key update for " + this, x);
68 * true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called
70 private final AtomicBoolean _open = new AtomicBoolean();
71 private final SelectorManager.ManagedSelector _selector;
72 private final SelectionKey _key;
74 * The desired value for {@link SelectionKey#interestOps()}
76 private final AtomicInteger _interestOps = new AtomicInteger();
78 public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
80 super(scheduler,channel);
83 setIdleTimeout(idleTimeout);
87 protected boolean needsFill()
89 updateLocalInterests(SelectionKey.OP_READ, true);
94 protected void onIncompleteFlush()
96 updateLocalInterests(SelectionKey.OP_WRITE, true);
100 public void onSelected()
102 assert _selector.isSelectorThread();
103 int oldInterestOps = _key.interestOps();
104 int readyOps = _key.readyOps();
105 int newInterestOps = oldInterestOps & ~readyOps;
106 setKeyInterests(oldInterestOps, newInterestOps);
107 updateLocalInterests(readyOps, false);
108 if (_key.isReadable())
109 getFillInterest().fillable();
110 if (_key.isWritable())
111 getWriteFlusher().completeWrite();
115 private void updateLocalInterests(int operation, boolean add)
119 int oldInterestOps = _interestOps.get();
122 newInterestOps = oldInterestOps | operation;
124 newInterestOps = oldInterestOps & ~operation;
126 if (isInputShutdown())
127 newInterestOps &= ~SelectionKey.OP_READ;
128 if (isOutputShutdown())
129 newInterestOps &= ~SelectionKey.OP_WRITE;
131 if (newInterestOps != oldInterestOps)
133 if (_interestOps.compareAndSet(oldInterestOps, newInterestOps))
135 LOG.debug("Local interests updated {} -> {} for {}", oldInterestOps, newInterestOps, this);
136 _selector.updateKey(_updateTask);
140 LOG.debug("Local interests update conflict: now {}, was {}, attempted {} for {}", _interestOps.get(), oldInterestOps, newInterestOps, this);
146 LOG.debug("Ignoring local interests update {} -> {} for {}", oldInterestOps, newInterestOps, this);
153 private void setKeyInterests(int oldInterestOps, int newInterestOps)
155 LOG.debug("Key interests updated {} -> {}", oldInterestOps, newInterestOps);
156 _key.interestOps(newInterestOps);
162 if (_open.compareAndSet(true, false))
165 _selector.destroyEndPoint(this);
170 public boolean isOpen()
172 // We cannot rely on super.isOpen(), because there is a race between calls to close() and isOpen():
173 // a thread may call close(), which flips the boolean but has not yet called super.close(), and
174 // another thread calls isOpen() which would return true - wrong - if based on super.isOpen().
181 if (_open.compareAndSet(false, true))
186 public String toString()
188 // Do NOT use synchronized (this)
189 // because it's very easy to deadlock when debugging is enabled.
190 // We do a best effort to print the right toString() and that's it.
193 boolean valid = _key!=null && _key.isValid();
194 int keyInterests = valid ? _key.interestOps() : -1;
195 int keyReadiness = valid ? _key.readyOps() : -1;
196 return String.format("%s{io=%d,kio=%d,kro=%d}",
202 catch (CancelledKeyException x)
204 return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _interestOps.get());