]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/io/AbstractConnection.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / io / AbstractConnection.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.io;
20
21 import java.util.List;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import org.eclipse.jetty.util.Callback;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31 import org.eclipse.jetty.util.thread.NonBlockingThread;
32
33 /**
34  * <p>A convenience base implementation of {@link Connection}.</p>
35  * <p>This class uses the capabilities of the {@link EndPoint} API to provide a
36  * more traditional style of async reading.  A call to {@link #fillInterested()}
37  * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
38  * as appropriate.</p>
39  */
40 public abstract class AbstractConnection implements Connection
41 {
42     private static final Logger LOG = Log.getLogger(AbstractConnection.class);
43     
44     public static final boolean EXECUTE_ONFILLABLE=true;
45
46     private final List<Listener> listeners = new CopyOnWriteArrayList<>();
47     private final AtomicReference<State> _state = new AtomicReference<>(IDLE);
48     private final long _created=System.currentTimeMillis();
49     private final EndPoint _endPoint;
50     private final Executor _executor;
51     private final Callback _readCallback;
52     private final boolean _executeOnfillable;
53     private int _inputBufferSize=2048;
54
55     protected AbstractConnection(EndPoint endp, Executor executor)
56     {
57         this(endp,executor,EXECUTE_ONFILLABLE);
58     }
59     
60     protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
61     {
62         if (executor == null)
63             throw new IllegalArgumentException("Executor must not be null!");
64         _endPoint = endp;
65         _executor = executor;
66         _readCallback = new ReadCallback();
67         _executeOnfillable=executeOnfillable;
68         _state.set(IDLE);
69     }
70
71     @Override
72     public void addListener(Listener listener)
73     {
74         listeners.add(listener);
75     }
76
77     public int getInputBufferSize()
78     {
79         return _inputBufferSize;
80     }
81
82     public void setInputBufferSize(int inputBufferSize)
83     {
84         _inputBufferSize = inputBufferSize;
85     }
86
87     protected Executor getExecutor()
88     {
89         return _executor;
90     }
91     
92     protected void failedCallback(final Callback callback, final Throwable x)
93     {
94         if (NonBlockingThread.isNonBlockingThread())
95         {
96             try
97             {
98                 getExecutor().execute(new Runnable()
99                 {
100                     @Override
101                     public void run()
102                     {
103                         callback.failed(x);
104                     }
105                 });
106             }
107             catch(RejectedExecutionException e)
108             {
109                 LOG.debug(e);
110                 callback.failed(x);
111             }
112         }
113         else
114         {
115             callback.failed(x);
116         }
117     }
118     
119     /**
120      * <p>Utility method to be called to register read interest.</p>
121      * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
122      * will be called back as appropriate.</p>
123      * @see #onFillable()
124      */
125     public void fillInterested()
126     {
127         LOG.debug("fillInterested {}",this);           
128         
129         while(true)
130         {
131             State state=_state.get();
132             if (next(state,state.fillInterested()))
133                 break;
134         }
135     }
136     
137     public void fillInterested(Callback callback)
138     {
139         LOG.debug("fillInterested {}",this);
140
141         while(true)
142         {
143             State state=_state.get();
144             // TODO yuck
145             if (state instanceof FillingInterestedCallback && ((FillingInterestedCallback)state)._callback==callback)
146                 break;
147             State next=new FillingInterestedCallback(callback,state);
148             if (next(state,next))
149                 break;
150         }
151     }
152     
153     /**
154      * <p>Callback method invoked when the endpoint is ready to be read.</p>
155      * @see #fillInterested()
156      */
157     public abstract void onFillable();
158
159     /**
160      * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
161      * @param cause the exception that caused the failure
162      */
163     protected void onFillInterestedFailed(Throwable cause)
164     {
165         LOG.debug("{} onFillInterestedFailed {}", this, cause);
166         if (_endPoint.isOpen())
167         {
168             boolean close = true;
169             if (cause instanceof TimeoutException)
170                 close = onReadTimeout();
171             if (close)
172             {
173                 if (_endPoint.isOutputShutdown())
174                     _endPoint.close();
175                 else
176                     _endPoint.shutdownOutput();
177             }
178         }
179
180         if (_endPoint.isOpen())
181             fillInterested();        
182     }
183
184     /**
185      * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
186      * @return true to signal that the endpoint must be closed, false to keep the endpoint open
187      */
188     protected boolean onReadTimeout()
189     {
190         return true;
191     }
192
193     @Override
194     public void onOpen()
195     {
196         LOG.debug("onOpen {}", this);
197
198         for (Listener listener : listeners)
199             listener.onOpened(this);
200     }
201
202     @Override
203     public void onClose()
204     {
205         LOG.debug("onClose {}",this);
206
207         for (Listener listener : listeners)
208             listener.onClosed(this);
209     }
210
211     @Override
212     public EndPoint getEndPoint()
213     {
214         return _endPoint;
215     }
216
217     @Override
218     public void close()
219     {
220         getEndPoint().close();
221     }
222
223     @Override
224     public int getMessagesIn()
225     {
226         return -1;
227     }
228
229     @Override
230     public int getMessagesOut()
231     {
232         return -1;
233     }
234
235     @Override
236     public long getBytesIn()
237     {
238         return -1;
239     }
240
241     @Override
242     public long getBytesOut()
243     {
244         return -1;
245     }
246
247     @Override
248     public long getCreatedTimeStamp()
249     {
250         return _created;
251     }
252
253     @Override
254     public String toString()
255     {
256         return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), _state.get());
257     }
258     
259     public boolean next(State state, State next)
260     {
261         if (next==null)
262             return true;
263         if(_state.compareAndSet(state,next))
264         {
265             LOG.debug("{}-->{} {}",state,next,this);
266             if (next!=state)
267                 next.onEnter(AbstractConnection.this);
268             return true;
269         }
270         return false;
271     }
272     
273     private static final class IdleState extends State
274     {
275         private IdleState()
276         {
277             super("IDLE");
278         }
279
280         @Override
281         State fillInterested()
282         {
283             return FILL_INTERESTED;
284         }
285     }
286
287
288     private static final class FillInterestedState extends State
289     {
290         private FillInterestedState()
291         {
292             super("FILL_INTERESTED");
293         }
294
295         @Override
296         public void onEnter(AbstractConnection connection)
297         {
298             connection.getEndPoint().fillInterested(connection._readCallback);
299         }
300
301         @Override
302         State fillInterested()
303         {
304             return this;
305         }
306
307         @Override
308         public State onFillable()
309         {
310             return FILLING;
311         }
312
313         @Override
314         State onFailed()
315         {
316             return IDLE;
317         }
318     }
319
320
321     private static final class RefillingState extends State
322     {
323         private RefillingState()
324         {
325             super("REFILLING");
326         }
327
328         @Override
329         State fillInterested()
330         {
331             return FILLING_FILL_INTERESTED;
332         }
333
334         @Override
335         public State onFilled()
336         {
337             return IDLE;
338         }
339     }
340
341
342     private static final class FillingFillInterestedState extends State
343     {
344         private FillingFillInterestedState(String name)
345         {
346             super(name);
347         }
348
349         @Override
350         State fillInterested()
351         {
352             return this;
353         }
354
355         State onFilled()
356         {
357             return FILL_INTERESTED;
358         }
359     }
360
361
362     private static final class FillingState extends State
363     {
364         private FillingState()
365         {
366             super("FILLING");
367         }
368
369         @Override
370         public void onEnter(AbstractConnection connection)
371         {
372             if (connection._executeOnfillable)
373                 connection.getExecutor().execute(connection._runOnFillable);
374             else
375                 connection._runOnFillable.run();
376         }
377
378         @Override
379         State fillInterested()
380         {
381             return FILLING_FILL_INTERESTED;
382         }
383
384         @Override
385         public State onFilled()
386         {
387             return IDLE;
388         }
389     }
390
391
392     public static class State
393     {
394         private final String _name;
395         State(String name)
396         {
397             _name=name;
398         }
399
400         @Override
401         public String toString()
402         {
403             return _name;
404         }
405         
406         void onEnter(AbstractConnection connection)
407         {
408         }
409         
410         State fillInterested()
411         {
412             throw new IllegalStateException(this.toString());
413         }
414
415         State onFillable()
416         {
417             throw new IllegalStateException(this.toString());
418         }
419
420         State onFilled()
421         {
422             throw new IllegalStateException(this.toString());
423         }
424         
425         State onFailed()
426         {
427             throw new IllegalStateException(this.toString());
428         }
429     }
430     
431
432     public static final State IDLE=new IdleState();
433     
434     public static final State FILL_INTERESTED=new FillInterestedState();
435     
436     public static final State FILLING=new FillingState();
437     
438     public static final State REFILLING=new RefillingState();
439
440     public static final State FILLING_FILL_INTERESTED=new FillingFillInterestedState("FILLING_FILL_INTERESTED");
441     
442     public class NestedState extends State
443     {
444         private final State _nested;
445         
446         NestedState(State nested)
447         {
448             super("NESTED("+nested+")");
449             _nested=nested;
450         }
451         NestedState(String name,State nested)
452         {
453             super(name+"("+nested+")");
454             _nested=nested;
455         }
456
457         @Override
458         State fillInterested()
459         {
460             return new NestedState(_nested.fillInterested());
461         }
462
463         @Override
464         State onFillable()
465         {
466             return new NestedState(_nested.onFillable());
467         }
468         
469         @Override
470         State onFilled()
471         {
472             return new NestedState(_nested.onFilled());
473         }
474     }
475     
476     
477     public class FillingInterestedCallback extends NestedState
478     {
479         private final Callback _callback;
480         
481         FillingInterestedCallback(Callback callback,State nested)
482         {
483             super("FILLING_INTERESTED_CALLBACK",nested==FILLING?REFILLING:nested);
484             _callback=callback;
485         }
486
487         @Override
488         void onEnter(final AbstractConnection connection)
489         {
490             Callback callback=new Callback()
491             {
492                 @Override
493                 public void succeeded()
494                 {
495                     while(true)
496                     {
497                         State state = connection._state.get();
498                         if (!(state instanceof NestedState))
499                             break;
500                         State nested=((NestedState)state)._nested;
501                         if (connection.next(state,nested))
502                             break;
503                     }
504                     _callback.succeeded();
505                 }
506
507                 @Override
508                 public void failed(Throwable x)
509                 {
510                     while(true)
511                     {
512                         State state = connection._state.get();
513                         if (!(state instanceof NestedState))
514                             break;
515                         State nested=((NestedState)state)._nested;
516                         if (connection.next(state,nested))
517                             break;
518                     }
519                     _callback.failed(x);
520                 }  
521             };
522             
523             connection.getEndPoint().fillInterested(callback);
524         }
525     }
526     
527     private final Runnable _runOnFillable = new Runnable()
528     {
529         @Override
530         public void run()
531         {
532             try
533             {
534                 onFillable();
535             }
536             finally
537             {
538                 while(true)
539                 {
540                     State state=_state.get();
541                     if (next(state,state.onFilled()))
542                         break;
543                 }
544             }
545         }
546     };
547     
548     
549     private class ReadCallback implements Callback
550     {   
551         @Override
552         public void succeeded()
553         {
554             while(true)
555             {
556                 State state=_state.get();
557                 if (next(state,state.onFillable()))
558                     break;
559             }
560         }
561
562         @Override
563         public void failed(final Throwable x)
564         {
565             _executor.execute(new Runnable()
566             {
567                 @Override
568                 public void run()
569                 {
570                     while(true)
571                     {
572                         State state=_state.get();
573                         if (next(state,state.onFailed()))
574                             break;
575                     }
576                     onFillInterestedFailed(x);
577                 }
578             });
579         }
580         
581         @Override
582         public String toString()
583         {
584             return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
585         }
586     };
587 }