]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/io/AbstractConnection.java
Merge "upd: remove 'browser install'"
[gigi.git] / lib / jetty / org / eclipse / jetty / io / AbstractConnection.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.io;
20
21 import java.util.List;
22 import java.util.concurrent.CopyOnWriteArrayList;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import org.eclipse.jetty.util.Callback;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
31 import org.eclipse.jetty.util.thread.NonBlockingThread;
32
33 /**
34  * <p>A convenience base implementation of {@link Connection}.</p>
35  * <p>This class uses the capabilities of the {@link EndPoint} API to provide a
36  * more traditional style of async reading.  A call to {@link #fillInterested()}
37  * will schedule a callback to {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
38  * as appropriate.</p>
39  */
40 public abstract class AbstractConnection implements Connection
41 {
42     private static final Logger LOG = Log.getLogger(AbstractConnection.class);
43
44     public static final boolean EXECUTE_ONFILLABLE=true;
45
46     private final List<Listener> listeners = new CopyOnWriteArrayList<>();
47     private final AtomicReference<State> _state = new AtomicReference<>(IDLE);
48     private final long _created=System.currentTimeMillis();
49     private final EndPoint _endPoint;
50     private final Executor _executor;
51     private final Callback _readCallback;
52     private final boolean _executeOnfillable;
53     private int _inputBufferSize=2048;
54
55     protected AbstractConnection(EndPoint endp, Executor executor)
56     {
57         this(endp,executor,EXECUTE_ONFILLABLE);
58     }
59
60     protected AbstractConnection(EndPoint endp, Executor executor, final boolean executeOnfillable)
61     {
62         if (executor == null)
63             throw new IllegalArgumentException("Executor must not be null!");
64         _endPoint = endp;
65         _executor = executor;
66         _readCallback = new ReadCallback();
67         _executeOnfillable=executeOnfillable;
68         _state.set(IDLE);
69     }
70
71     @Override
72     public void addListener(Listener listener)
73     {
74         listeners.add(listener);
75     }
76
77     public int getInputBufferSize()
78     {
79         return _inputBufferSize;
80     }
81
82     public void setInputBufferSize(int inputBufferSize)
83     {
84         _inputBufferSize = inputBufferSize;
85     }
86
87     protected Executor getExecutor()
88     {
89         return _executor;
90     }
91
92     protected void failedCallback(final Callback callback, final Throwable x)
93     {
94         if (NonBlockingThread.isNonBlockingThread())
95         {
96             try
97             {
98                 getExecutor().execute(new Runnable()
99                 {
100                     @Override
101                     public void run()
102                     {
103                         callback.failed(x);
104                     }
105                 });
106             }
107             catch(RejectedExecutionException e)
108             {
109                 LOG.debug(e);
110                 callback.failed(x);
111             }
112         }
113         else
114         {
115             callback.failed(x);
116         }
117     }
118
119     /**
120      * <p>Utility method to be called to register read interest.</p>
121      * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
122      * will be called back as appropriate.</p>
123      * @see #onFillable()
124      */
125     public void fillInterested()
126     {
127         if (LOG.isDebugEnabled())
128             LOG.debug("fillInterested {}",this);
129
130         while(true)
131         {
132             State state=_state.get();
133             if (next(state,state.fillInterested()))
134                 break;
135         }
136     }
137
138     public void fillInterested(Callback callback)
139     {
140         if (LOG.isDebugEnabled())
141             LOG.debug("fillInterested {}",this);
142
143         while(true)
144         {
145             State state=_state.get();
146             // TODO yuck
147             if (state instanceof FillingInterestedCallback && ((FillingInterestedCallback)state)._callback==callback)
148                 break;
149             State next=new FillingInterestedCallback(callback,state);
150             if (next(state,next))
151                 break;
152         }
153     }
154
155     /**
156      * <p>Callback method invoked when the endpoint is ready to be read.</p>
157      * @see #fillInterested()
158      */
159     public abstract void onFillable();
160
161     /**
162      * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
163      * @param cause the exception that caused the failure
164      */
165     protected void onFillInterestedFailed(Throwable cause)
166     {
167         if (LOG.isDebugEnabled())
168             LOG.debug("{} onFillInterestedFailed {}", this, cause);
169         if (_endPoint.isOpen())
170         {
171             boolean close = true;
172             if (cause instanceof TimeoutException)
173                 close = onReadTimeout();
174             if (close)
175             {
176                 if (_endPoint.isOutputShutdown())
177                     _endPoint.close();
178                 else
179                     _endPoint.shutdownOutput();
180             }
181         }
182
183         if (_endPoint.isOpen())
184             fillInterested();
185     }
186
187     /**
188      * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
189      * @return true to signal that the endpoint must be closed, false to keep the endpoint open
190      */
191     protected boolean onReadTimeout()
192     {
193         return true;
194     }
195
196     @Override
197     public void onOpen()
198     {
199         if (LOG.isDebugEnabled())
200             LOG.debug("onOpen {}", this);
201
202         for (Listener listener : listeners)
203             listener.onOpened(this);
204     }
205
206     @Override
207     public void onClose()
208     {
209         if (LOG.isDebugEnabled())
210             LOG.debug("onClose {}",this);
211
212         for (Listener listener : listeners)
213             listener.onClosed(this);
214     }
215
216     @Override
217     public EndPoint getEndPoint()
218     {
219         return _endPoint;
220     }
221
222     @Override
223     public void close()
224     {
225         getEndPoint().close();
226     }
227
228     @Override
229     public int getMessagesIn()
230     {
231         return -1;
232     }
233
234     @Override
235     public int getMessagesOut()
236     {
237         return -1;
238     }
239
240     @Override
241     public long getBytesIn()
242     {
243         return -1;
244     }
245
246     @Override
247     public long getBytesOut()
248     {
249         return -1;
250     }
251
252     @Override
253     public long getCreatedTimeStamp()
254     {
255         return _created;
256     }
257
258     @Override
259     public String toString()
260     {
261         return String.format("%s@%x[%s,%s]",
262                 getClass().getSimpleName(),
263                 hashCode(),
264                 _state.get(),
265                 _endPoint);
266     }
267
268     public boolean next(State state, State next)
269     {
270         if (next==null)
271             return true;
272         if(_state.compareAndSet(state,next))
273         {
274             if (LOG.isDebugEnabled())
275                 LOG.debug("{}-->{} {}",state,next,this);
276             if (next!=state)
277                 next.onEnter(AbstractConnection.this);
278             return true;
279         }
280         return false;
281     }
282
283     private static final class IdleState extends State
284     {
285         private IdleState()
286         {
287             super("IDLE");
288         }
289
290         @Override
291         State fillInterested()
292         {
293             return FILL_INTERESTED;
294         }
295     }
296
297
298     private static final class FillInterestedState extends State
299     {
300         private FillInterestedState()
301         {
302             super("FILL_INTERESTED");
303         }
304
305         @Override
306         public void onEnter(AbstractConnection connection)
307         {
308             connection.getEndPoint().fillInterested(connection._readCallback);
309         }
310
311         @Override
312         State fillInterested()
313         {
314             return this;
315         }
316
317         @Override
318         public State onFillable()
319         {
320             return FILLING;
321         }
322
323         @Override
324         State onFailed()
325         {
326             return IDLE;
327         }
328     }
329
330
331     private static final class RefillingState extends State
332     {
333         private RefillingState()
334         {
335             super("REFILLING");
336         }
337
338         @Override
339         State fillInterested()
340         {
341             return FILLING_FILL_INTERESTED;
342         }
343
344         @Override
345         public State onFilled()
346         {
347             return IDLE;
348         }
349     }
350
351
352     private static final class FillingFillInterestedState extends State
353     {
354         private FillingFillInterestedState(String name)
355         {
356             super(name);
357         }
358
359         @Override
360         State fillInterested()
361         {
362             return this;
363         }
364
365         State onFilled()
366         {
367             return FILL_INTERESTED;
368         }
369     }
370
371
372     private static final class FillingState extends State
373     {
374         private FillingState()
375         {
376             super("FILLING");
377         }
378
379         @Override
380         public void onEnter(AbstractConnection connection)
381         {
382             if (connection._executeOnfillable)
383                 connection.getExecutor().execute(connection._runOnFillable);
384             else
385                 connection._runOnFillable.run();
386         }
387
388         @Override
389         State fillInterested()
390         {
391             return FILLING_FILL_INTERESTED;
392         }
393
394         @Override
395         public State onFilled()
396         {
397             return IDLE;
398         }
399     }
400
401
402     public static class State
403     {
404         private final String _name;
405         State(String name)
406         {
407             _name=name;
408         }
409
410         @Override
411         public String toString()
412         {
413             return _name;
414         }
415
416         void onEnter(AbstractConnection connection)
417         {
418         }
419
420         State fillInterested()
421         {
422             throw new IllegalStateException(this.toString());
423         }
424
425         State onFillable()
426         {
427             throw new IllegalStateException(this.toString());
428         }
429
430         State onFilled()
431         {
432             throw new IllegalStateException(this.toString());
433         }
434
435         State onFailed()
436         {
437             throw new IllegalStateException(this.toString());
438         }
439     }
440
441
442     public static final State IDLE=new IdleState();
443
444     public static final State FILL_INTERESTED=new FillInterestedState();
445
446     public static final State FILLING=new FillingState();
447
448     public static final State REFILLING=new RefillingState();
449
450     public static final State FILLING_FILL_INTERESTED=new FillingFillInterestedState("FILLING_FILL_INTERESTED");
451
452     public class NestedState extends State
453     {
454         private final State _nested;
455
456         NestedState(State nested)
457         {
458             super("NESTED("+nested+")");
459             _nested=nested;
460         }
461         NestedState(String name,State nested)
462         {
463             super(name+"("+nested+")");
464             _nested=nested;
465         }
466
467         @Override
468         State fillInterested()
469         {
470             return new NestedState(_nested.fillInterested());
471         }
472
473         @Override
474         State onFillable()
475         {
476             return new NestedState(_nested.onFillable());
477         }
478
479         @Override
480         State onFilled()
481         {
482             return new NestedState(_nested.onFilled());
483         }
484     }
485
486
487     public class FillingInterestedCallback extends NestedState
488     {
489         private final Callback _callback;
490
491         FillingInterestedCallback(Callback callback,State nested)
492         {
493             super("FILLING_INTERESTED_CALLBACK",nested==FILLING?REFILLING:nested);
494             _callback=callback;
495         }
496
497         @Override
498         void onEnter(final AbstractConnection connection)
499         {
500             Callback callback=new Callback()
501             {
502                 @Override
503                 public void succeeded()
504                 {
505                     while(true)
506                     {
507                         State state = connection._state.get();
508                         if (!(state instanceof NestedState))
509                             break;
510                         State nested=((NestedState)state)._nested;
511                         if (connection.next(state,nested))
512                             break;
513                     }
514                     _callback.succeeded();
515                 }
516
517                 @Override
518                 public void failed(Throwable x)
519                 {
520                     while(true)
521                     {
522                         State state = connection._state.get();
523                         if (!(state instanceof NestedState))
524                             break;
525                         State nested=((NestedState)state)._nested;
526                         if (connection.next(state,nested))
527                             break;
528                     }
529                     _callback.failed(x);
530                 }
531             };
532
533             connection.getEndPoint().fillInterested(callback);
534         }
535     }
536
537     private final Runnable _runOnFillable = new Runnable()
538     {
539         @Override
540         public void run()
541         {
542             try
543             {
544                 onFillable();
545             }
546             finally
547             {
548                 while(true)
549                 {
550                     State state=_state.get();
551                     if (next(state,state.onFilled()))
552                         break;
553                 }
554             }
555         }
556     };
557
558
559     private class ReadCallback implements Callback
560     {
561         @Override
562         public void succeeded()
563         {
564             while(true)
565             {
566                 State state=_state.get();
567                 if (next(state,state.onFillable()))
568                     break;
569             }
570         }
571
572         @Override
573         public void failed(final Throwable x)
574         {
575             _executor.execute(new Runnable()
576             {
577                 @Override
578                 public void run()
579                 {
580                     while(true)
581                     {
582                         State state=_state.get();
583                         if (next(state,state.onFailed()))
584                             break;
585                     }
586                     onFillInterestedFailed(x);
587                 }
588             });
589         }
590
591         @Override
592         public String toString()
593         {
594             return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
595         }
596     };
597 }