2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.io;
21 import java.util.List;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicReference;
28 import org.eclipse.jetty.util.Callback;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31 import org.eclipse.jetty.util.thread.NonBlockingThread;
34 * <p>A convenience base implementation of {@link Connection}.</p>
35 * <p>This class uses the capabilities of the {@link EndPoint} API to provide a
36 * more traditional style of async reading. A call to {@link #fillInterested()}
37 * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
40 public abstract class AbstractConnection implements Connection
42 private static final Logger LOG = Log.getLogger(AbstractConnection.class);
44 public static final boolean EXECUTE_ONFILLABLE=true;
46 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
47 private final AtomicReference<State> _state = new AtomicReference<>(IDLE);
48 private final long _created=System.currentTimeMillis();
49 private final EndPoint _endPoint;
50 private final Executor _executor;
51 private final Callback _readCallback;
52 private final boolean _executeOnfillable;
53 private int _inputBufferSize=2048;
55 protected AbstractConnection(EndPoint endp, Executor executor)
57 this(endp,executor,EXECUTE_ONFILLABLE);
60 protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
63 throw new IllegalArgumentException("Executor must not be null!");
66 _readCallback = new ReadCallback();
67 _executeOnfillable=executeOnfillable;
72 public void addListener(Listener listener)
74 listeners.add(listener);
77 public int getInputBufferSize()
79 return _inputBufferSize;
82 public void setInputBufferSize(int inputBufferSize)
84 _inputBufferSize = inputBufferSize;
87 protected Executor getExecutor()
92 protected void failedCallback(final Callback callback, final Throwable x)
94 if (NonBlockingThread.isNonBlockingThread())
98 getExecutor().execute(new Runnable()
107 catch(RejectedExecutionException e)
120 * <p>Utility method to be called to register read interest.</p>
121 * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
122 * will be called back as appropriate.</p>
125 public void fillInterested()
127 LOG.debug("fillInterested {}",this);
131 State state=_state.get();
132 if (next(state,state.fillInterested()))
137 public void fillInterested(Callback callback)
139 LOG.debug("fillInterested {}",this);
143 State state=_state.get();
145 if (state instanceof FillingInterestedCallback && ((FillingInterestedCallback)state)._callback==callback)
147 State next=new FillingInterestedCallback(callback,state);
148 if (next(state,next))
154 * <p>Callback method invoked when the endpoint is ready to be read.</p>
155 * @see #fillInterested()
157 public abstract void onFillable();
160 * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
161 * @param cause the exception that caused the failure
163 protected void onFillInterestedFailed(Throwable cause)
165 LOG.debug("{} onFillInterestedFailed {}", this, cause);
166 if (_endPoint.isOpen())
168 boolean close = true;
169 if (cause instanceof TimeoutException)
170 close = onReadTimeout();
173 if (_endPoint.isOutputShutdown())
176 _endPoint.shutdownOutput();
180 if (_endPoint.isOpen())
185 * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
186 * @return true to signal that the endpoint must be closed, false to keep the endpoint open
188 protected boolean onReadTimeout()
196 LOG.debug("onOpen {}", this);
198 for (Listener listener : listeners)
199 listener.onOpened(this);
203 public void onClose()
205 LOG.debug("onClose {}",this);
207 for (Listener listener : listeners)
208 listener.onClosed(this);
212 public EndPoint getEndPoint()
220 getEndPoint().close();
224 public int getMessagesIn()
230 public int getMessagesOut()
236 public long getBytesIn()
242 public long getBytesOut()
248 public long getCreatedTimeStamp()
254 public String toString()
256 return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
259 public boolean next(State state, State next)
263 if(_state.compareAndSet(state,next))
265 LOG.debug("{}-->{} {}",state,next,this);
267 next.onEnter(AbstractConnection.this);
273 private static final class IdleState extends State
281 State fillInterested()
283 return FILL_INTERESTED;
288 private static final class FillInterestedState extends State
290 private FillInterestedState()
292 super("FILL_INTERESTED");
296 public void onEnter(AbstractConnection connection)
298 connection.getEndPoint().fillInterested(connection._readCallback);
302 State fillInterested()
308 public State onFillable()
321 private static final class RefillingState extends State
323 private RefillingState()
329 State fillInterested()
331 return FILLING_FILL_INTERESTED;
335 public State onFilled()
342 private static final class FillingFillInterestedState extends State
344 private FillingFillInterestedState(String name)
350 State fillInterested()
357 return FILL_INTERESTED;
362 private static final class FillingState extends State
364 private FillingState()
370 public void onEnter(AbstractConnection connection)
372 if (connection._executeOnfillable)
373 connection.getExecutor().execute(connection._runOnFillable);
375 connection._runOnFillable.run();
379 State fillInterested()
381 return FILLING_FILL_INTERESTED;
385 public State onFilled()
392 public static class State
394 private final String _name;
401 public String toString()
406 void onEnter(AbstractConnection connection)
410 State fillInterested()
412 throw new IllegalStateException(this.toString());
417 throw new IllegalStateException(this.toString());
422 throw new IllegalStateException(this.toString());
427 throw new IllegalStateException(this.toString());
432 public static final State IDLE=new IdleState();
434 public static final State FILL_INTERESTED=new FillInterestedState();
436 public static final State FILLING=new FillingState();
438 public static final State REFILLING=new RefillingState();
440 public static final State FILLING_FILL_INTERESTED=new FillingFillInterestedState("FILLING_FILL_INTERESTED");
442 public class NestedState extends State
444 private final State _nested;
446 NestedState(State nested)
448 super("NESTED("+nested+")");
451 NestedState(String name,State nested)
453 super(name+"("+nested+")");
458 State fillInterested()
460 return new NestedState(_nested.fillInterested());
466 return new NestedState(_nested.onFillable());
472 return new NestedState(_nested.onFilled());
477 public class FillingInterestedCallback extends NestedState
479 private final Callback _callback;
481 FillingInterestedCallback(Callback callback,State nested)
483 super("FILLING_INTERESTED_CALLBACK",nested==FILLING?REFILLING:nested);
488 void onEnter(final AbstractConnection connection)
490 Callback callback=new Callback()
493 public void succeeded()
497 State state = connection._state.get();
498 if (!(state instanceof NestedState))
500 State nested=((NestedState)state)._nested;
501 if (connection.next(state,nested))
504 _callback.succeeded();
508 public void failed(Throwable x)
512 State state = connection._state.get();
513 if (!(state instanceof NestedState))
515 State nested=((NestedState)state)._nested;
516 if (connection.next(state,nested))
523 connection.getEndPoint().fillInterested(callback);
527 private final Runnable _runOnFillable = new Runnable()
540 State state=_state.get();
541 if (next(state,state.onFilled()))
549 private class ReadCallback implements Callback
552 public void succeeded()
556 State state=_state.get();
557 if (next(state,state.onFillable()))
563 public void failed(final Throwable x)
565 _executor.execute(new Runnable()
572 State state=_state.get();
573 if (next(state,state.onFailed()))
576 onFillInterestedFailed(x);
582 public String toString()
584 return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);