2 // ========================================================================
3 // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.io;
21 import java.util.List;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicReference;
28 import org.eclipse.jetty.util.Callback;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31 import org.eclipse.jetty.util.thread.NonBlockingThread;
34 * <p>A convenience base implementation of {@link Connection}.</p>
35 * <p>This class uses the capabilities of the {@link EndPoint} API to provide a
36 * more traditional style of async reading. A call to {@link #fillInterested()}
37 * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
40 public abstract class AbstractConnection implements Connection
42 private static final Logger LOG = Log.getLogger(AbstractConnection.class);
44 public static final boolean EXECUTE_ONFILLABLE=true;
46 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
47 private final AtomicReference<State> _state = new AtomicReference<>(IDLE);
48 private final long _created=System.currentTimeMillis();
49 private final EndPoint _endPoint;
50 private final Executor _executor;
51 private final Callback _readCallback;
52 private final boolean _executeOnfillable;
53 private int _inputBufferSize=2048;
55 protected AbstractConnection(EndPoint endp, Executor executor)
57 this(endp,executor,EXECUTE_ONFILLABLE);
60 protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
63 throw new IllegalArgumentException("Executor must not be null!");
66 _readCallback = new ReadCallback();
67 _executeOnfillable=executeOnfillable;
72 public void addListener(Listener listener)
74 listeners.add(listener);
77 public int getInputBufferSize()
79 return _inputBufferSize;
82 public void setInputBufferSize(int inputBufferSize)
84 _inputBufferSize = inputBufferSize;
87 protected Executor getExecutor()
92 protected void failedCallback(final Callback callback, final Throwable x)
94 if (NonBlockingThread.isNonBlockingThread())
98 getExecutor().execute(new Runnable()
107 catch(RejectedExecutionException e)
120 * <p>Utility method to be called to register read interest.</p>
121 * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
122 * will be called back as appropriate.</p>
125 public void fillInterested()
127 if (LOG.isDebugEnabled())
128 LOG.debug("fillInterested {}",this);
132 State state=_state.get();
133 if (next(state,state.fillInterested()))
138 public void fillInterested(Callback callback)
140 if (LOG.isDebugEnabled())
141 LOG.debug("fillInterested {}",this);
145 State state=_state.get();
147 if (state instanceof FillingInterestedCallback && ((FillingInterestedCallback)state)._callback==callback)
149 State next=new FillingInterestedCallback(callback,state);
150 if (next(state,next))
156 * <p>Callback method invoked when the endpoint is ready to be read.</p>
157 * @see #fillInterested()
159 public abstract void onFillable();
162 * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
163 * @param cause the exception that caused the failure
165 protected void onFillInterestedFailed(Throwable cause)
167 if (LOG.isDebugEnabled())
168 LOG.debug("{} onFillInterestedFailed {}", this, cause);
169 if (_endPoint.isOpen())
171 boolean close = true;
172 if (cause instanceof TimeoutException)
173 close = onReadTimeout();
176 if (_endPoint.isOutputShutdown())
179 _endPoint.shutdownOutput();
183 if (_endPoint.isOpen())
188 * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
189 * @return true to signal that the endpoint must be closed, false to keep the endpoint open
191 protected boolean onReadTimeout()
199 if (LOG.isDebugEnabled())
200 LOG.debug("onOpen {}", this);
202 for (Listener listener : listeners)
203 listener.onOpened(this);
207 public void onClose()
209 if (LOG.isDebugEnabled())
210 LOG.debug("onClose {}",this);
212 for (Listener listener : listeners)
213 listener.onClosed(this);
217 public EndPoint getEndPoint()
225 getEndPoint().close();
229 public int getMessagesIn()
235 public int getMessagesOut()
241 public long getBytesIn()
247 public long getBytesOut()
253 public long getCreatedTimeStamp()
259 public String toString()
261 return String.format("%s@%x[%s,%s]",
262 getClass().getSimpleName(),
268 public boolean next(State state, State next)
272 if(_state.compareAndSet(state,next))
274 if (LOG.isDebugEnabled())
275 LOG.debug("{}-->{} {}",state,next,this);
277 next.onEnter(AbstractConnection.this);
283 private static final class IdleState extends State
291 State fillInterested()
293 return FILL_INTERESTED;
298 private static final class FillInterestedState extends State
300 private FillInterestedState()
302 super("FILL_INTERESTED");
306 public void onEnter(AbstractConnection connection)
308 connection.getEndPoint().fillInterested(connection._readCallback);
312 State fillInterested()
318 public State onFillable()
331 private static final class RefillingState extends State
333 private RefillingState()
339 State fillInterested()
341 return FILLING_FILL_INTERESTED;
345 public State onFilled()
352 private static final class FillingFillInterestedState extends State
354 private FillingFillInterestedState(String name)
360 State fillInterested()
367 return FILL_INTERESTED;
372 private static final class FillingState extends State
374 private FillingState()
380 public void onEnter(AbstractConnection connection)
382 if (connection._executeOnfillable)
383 connection.getExecutor().execute(connection._runOnFillable);
385 connection._runOnFillable.run();
389 State fillInterested()
391 return FILLING_FILL_INTERESTED;
395 public State onFilled()
402 public static class State
404 private final String _name;
411 public String toString()
416 void onEnter(AbstractConnection connection)
420 State fillInterested()
422 throw new IllegalStateException(this.toString());
427 throw new IllegalStateException(this.toString());
432 throw new IllegalStateException(this.toString());
437 throw new IllegalStateException(this.toString());
442 public static final State IDLE=new IdleState();
444 public static final State FILL_INTERESTED=new FillInterestedState();
446 public static final State FILLING=new FillingState();
448 public static final State REFILLING=new RefillingState();
450 public static final State FILLING_FILL_INTERESTED=new FillingFillInterestedState("FILLING_FILL_INTERESTED");
452 public class NestedState extends State
454 private final State _nested;
456 NestedState(State nested)
458 super("NESTED("+nested+")");
461 NestedState(String name,State nested)
463 super(name+"("+nested+")");
468 State fillInterested()
470 return new NestedState(_nested.fillInterested());
476 return new NestedState(_nested.onFillable());
482 return new NestedState(_nested.onFilled());
487 public class FillingInterestedCallback extends NestedState
489 private final Callback _callback;
491 FillingInterestedCallback(Callback callback,State nested)
493 super("FILLING_INTERESTED_CALLBACK",nested==FILLING?REFILLING:nested);
498 void onEnter(final AbstractConnection connection)
500 Callback callback=new Callback()
503 public void succeeded()
507 State state = connection._state.get();
508 if (!(state instanceof NestedState))
510 State nested=((NestedState)state)._nested;
511 if (connection.next(state,nested))
514 _callback.succeeded();
518 public void failed(Throwable x)
522 State state = connection._state.get();
523 if (!(state instanceof NestedState))
525 State nested=((NestedState)state)._nested;
526 if (connection.next(state,nested))
533 connection.getEndPoint().fillInterested(callback);
537 private final Runnable _runOnFillable = new Runnable()
550 State state=_state.get();
551 if (next(state,state.onFilled()))
559 private class ReadCallback implements Callback
562 public void succeeded()
566 State state=_state.get();
567 if (next(state,state.onFillable()))
573 public void failed(final Throwable x)
575 _executor.execute(new Runnable()
582 State state=_state.get();
583 if (next(state,state.onFailed()))
586 onFillInterestedFailed(x);
592 public String toString()
594 return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);