]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/util/IteratingCallback.java
updating jetty to jetty-9.2.16.v2016040
[gigi.git] / lib / jetty / org / eclipse / jetty / util / IteratingCallback.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.util;
20
21 import java.nio.channels.ClosedChannelException;
22 import java.util.concurrent.atomic.AtomicReference;
23
24 /**
25  * This specialized callback implements a pattern that allows
26  * a large job to be broken into smaller tasks using iteration
27  * rather than recursion.
28  * <p/>
29  * A typical example is the write of a large content to a socket,
30  * divided in chunks. Chunk C1 is written by thread T1, which
31  * also invokes the callback, which writes chunk C2, which invokes
32  * the callback again, which writes chunk C3, and so forth.
33  * <p/>
34  * The problem with the example is that if the callback thread
35  * is the same that performs the I/O operation, then the process
36  * is recursive and may result in a stack overflow.
37  * To avoid the stack overflow, a thread dispatch must be performed,
38  * causing context switching and cache misses, affecting performance.
39  * <p/>
40  * To avoid this issue, this callback uses an AtomicReference to
41  * record whether success callback has been called during the processing
42  * of a sub task, and if so then the processing iterates rather than
43  * recurring.
44  * <p/>
45  * Subclasses must implement method {@link #process()} where the sub
46  * task is executed and a suitable {@link IteratingCallback.Action} is
47  * returned to this callback to indicate the overall progress of the job.
48  * This callback is passed to the asynchronous execution of each sub
49  * task and a call the {@link #succeeded()} on this callback represents
50  * the completion of the sub task.
51  */
52 public abstract class IteratingCallback implements Callback
53 {
54     /**
55      * The internal states of this callback
56      */
57     private enum State
58     {
59         /**
60          * This callback is IDLE, ready to iterate.
61          */
62         IDLE,
63
64         /**
65          * This callback is iterating calls to {@link #process()} and is dealing with
66          * the returns.  To get into processing state, it much of held the lock state
67          * and set iterating to true.
68          */
69         PROCESSING,
70         
71         /**
72          * Waiting for a schedule callback
73          */
74         PENDING,
75         
76         /**
77          * Called by a schedule callback
78          */
79         CALLED,
80         
81         /**
82          * The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return 
83          * from {@link IteratingCallback#process()}
84          */
85         SUCCEEDED,
86         
87         /**
88          * The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Throwable)}
89          */
90         FAILED,
91         
92         /**
93          * This callback has been closed and cannot be reset.
94          */ 
95         CLOSED,
96         
97         /**
98          * State is locked while leaving processing state to check the iterate boolean
99          */
100         LOCKED
101     }
102
103     /**
104      * The indication of the overall progress of the overall job that
105      * implementations of {@link #process()} must return.
106      */
107     protected enum Action
108     {
109         /**
110          * Indicates that {@link #process()} has no more work to do,
111          * but the overall job is not completed yet, probably waiting
112          * for additional events to trigger more work.
113          */
114         IDLE,
115         /**
116          * Indicates that {@link #process()} is executing asynchronously
117          * a sub task, where the execution has started but the callback
118          * may have not yet been invoked.
119          */
120         SCHEDULED,
121         
122         /**
123          * Indicates that {@link #process()} has completed the overall job.
124          */
125         SUCCEEDED
126     }
127
128     private final AtomicReference<State> _state;
129     private boolean _iterate;
130     
131     
132     protected IteratingCallback()
133     {
134         _state = new AtomicReference<>(State.IDLE);
135     }
136     
137     protected IteratingCallback(boolean needReset)
138     {
139         _state = new AtomicReference<>(needReset ? State.SUCCEEDED : State.IDLE);
140     }
141     
142     /**
143      * Method called by {@link #iterate()} to process the sub task.
144      * <p/>
145      * Implementations must start the asynchronous execution of the sub task
146      * (if any) and return an appropriate action:
147      * <ul>
148      * <li>{@link Action#IDLE} when no sub tasks are available for execution
149      * but the overall job is not completed yet</li>
150      * <li>{@link Action#SCHEDULED} when the sub task asynchronous execution
151      * has been started</li>
152      * <li>{@link Action#SUCCEEDED} when the overall job is completed</li>
153      * </ul>
154      *
155      * @throws Exception if the sub task processing throws
156      */
157     protected abstract Action process() throws Exception;
158
159     /**
160      * @deprecated Use {@link #onCompleteSuccess()} instead.
161      */
162     @Deprecated
163     protected void completed()
164     {
165     }
166
167     /**
168      * Invoked when the overall task has completed successfully.
169      *
170      * @see #onCompleteFailure(Throwable)
171      */
172     protected void onCompleteSuccess()
173     {
174         completed();
175     }
176     
177     /**
178      * Invoked when the overall task has completed with a failure.
179      *
180      * @see #onCompleteSuccess()
181      */
182     protected void onCompleteFailure(Throwable x)
183     {
184     }
185
186     /**
187      * This method must be invoked by applications to start the processing
188      * of sub tasks.  It can be called at any time by any thread, and it's 
189      * contract is that when called, then the {@link #process()} method will
190      * be called during or soon after, either by the calling thread or by 
191      * another thread.
192      */
193     public void iterate()
194     {
195         loop: while (true)
196         {
197             State state=_state.get();
198             switch (state)
199             {
200                 case PENDING:
201                 case CALLED:
202                     // process will be called when callback is handled
203                     break loop;
204                     
205                 case IDLE:
206                     if (!_state.compareAndSet(state,State.PROCESSING))
207                         continue;
208                     processing();
209                     break loop;
210                     
211                 case PROCESSING:
212                     if (!_state.compareAndSet(state,State.LOCKED))
213                         continue;
214                     // Tell the thread that is processing that it must iterate again
215                     _iterate=true;
216                     _state.set(State.PROCESSING);
217                     break loop;
218                     
219                 case LOCKED:
220                     Thread.yield();
221                     continue loop;
222
223                 case FAILED:
224                 case SUCCEEDED:
225                     break loop;
226
227                 case CLOSED:
228                 default:
229                     throw new IllegalStateException("state="+state);
230             }
231         }
232     }
233
234     private void processing() 
235     {
236         // This should only ever be called when in processing state, however a failed or close call
237         // may happen concurrently, so state is not assumed.
238         
239         // While we are processing
240         processing: while (true)
241         {
242             // Call process to get the action that we have to take.
243             Action action;
244             try
245             {
246                 action = process();
247             }
248             catch (Throwable x)
249             {
250                 failed(x);
251                 break processing;
252             }
253
254             // loop until we have successfully acted on the action we have just received
255             acting: while(true)
256             {
257                 // action handling needs to know the state
258                 State state=_state.get();
259                 
260                 switch (state)
261                 {
262                     case PROCESSING:
263                     {
264                         switch (action)
265                         {
266                             case IDLE:
267                             {
268                                 // lock the state
269                                 if (!_state.compareAndSet(state,State.LOCKED))
270                                     continue acting;
271
272                                 // Has iterate been called while we were processing?
273                                 if (_iterate)
274                                 {
275                                     // yes, so skip idle and keep processing
276                                     _iterate=false;
277                                     _state.set(State.PROCESSING);
278                                     continue processing;
279                                 }
280
281                                 // No, so we can go idle
282                                 _state.set(State.IDLE);
283                                 break processing;
284                             }
285                             
286                             case SCHEDULED:
287                             {
288                                 if (!_state.compareAndSet(state, State.PENDING))
289                                     continue acting;
290                                 // we won the race against the callback, so the callback has to process and we can break processing
291                                 break processing;
292                             }
293                             
294                             case SUCCEEDED:
295                             {
296                                 if (!_state.compareAndSet(state, State.LOCKED))
297                                     continue acting;
298                                 _iterate=false;
299                                 _state.set(State.SUCCEEDED);
300                                 onCompleteSuccess();
301                                 break processing;
302                             }
303
304                             default:
305                                 throw new IllegalStateException("state="+state+" action="+action); 
306                         }
307                     }
308                     
309                     case CALLED:
310                     {
311                         switch (action)
312                         {
313                             case SCHEDULED:
314                             {
315                                 if (!_state.compareAndSet(state, State.PROCESSING))
316                                     continue acting;
317                                 // we lost the race, so we have to keep processing
318                                 continue processing;
319                             }
320
321                             default:
322                                 throw new IllegalStateException("state="+state+" action="+action); 
323                         }
324                     }
325                         
326                     case LOCKED:
327                         Thread.yield();
328                         continue acting;
329
330                     case SUCCEEDED:
331                     case FAILED:
332                     case CLOSED:
333                         break processing;
334
335                     case IDLE:
336                     case PENDING:
337                     default:
338                         throw new IllegalStateException("state="+state+" action="+action); 
339                 }
340             }
341         }
342     }
343     
344     /**
345      * Invoked when the sub task succeeds.
346      * Subclasses that override this method must always remember to call
347      * {@code super.succeeded()}.
348      */
349     @Override
350     public void succeeded()
351     {
352         loop: while (true)
353         {
354             State state = _state.get();
355             switch (state)
356             {
357                 case PROCESSING:
358                 {
359                     if (!_state.compareAndSet(state, State.CALLED))
360                         continue loop;
361                     break loop;
362                 }
363                 case PENDING:
364                 {
365                     if (!_state.compareAndSet(state, State.PROCESSING))
366                         continue loop;
367                     processing();
368                     break loop;
369                 }
370                 case CLOSED:
371                 case FAILED:
372                 {
373                     // Too late!
374                     break loop;
375                 }
376                 case LOCKED:
377                 {
378                     Thread.yield();
379                     continue loop;
380                 }       
381                 default:
382                 {
383                     throw new IllegalStateException("state="+state);
384                 }
385             }
386         }
387     }
388
389     /**
390      * Invoked when the sub task fails.
391      * Subclasses that override this method must always remember to call
392      * {@code super.failed(Throwable)}.
393      */
394     @Override
395     public void failed(Throwable x)
396     {
397         loop: while (true)
398         {
399             State state = _state.get();
400             switch (state)
401             {
402                 case SUCCEEDED:
403                 case FAILED:
404                 case IDLE:
405                 case CLOSED:
406                 case CALLED:
407                 {
408                     // too late!.
409                     break loop;
410                 }
411                 case LOCKED:
412                 {
413                     Thread.yield();
414                     continue loop;
415                 }  
416                 case PENDING: 
417                 case PROCESSING: 
418                 {
419                     if (!_state.compareAndSet(state, State.FAILED))
420                         continue loop;
421
422                     onCompleteFailure(x);
423                     break loop;
424                 }
425                 default:
426                     throw new IllegalStateException("state="+state);
427             }
428         }
429     }
430
431     public void close()
432     {
433         loop: while (true)
434         {
435             State state = _state.get();
436             switch (state)
437             {
438                 case IDLE:
439                 case SUCCEEDED:
440                 case FAILED:
441                 {
442                     if (!_state.compareAndSet(state, State.CLOSED))
443                         continue loop;
444                     break loop;
445                 }
446                 case CLOSED:
447                 {
448                     break loop;
449                 }
450                 case LOCKED:
451                 {
452                     Thread.yield();
453                     continue loop;
454                 }    
455                 default:
456                 {
457                     if (!_state.compareAndSet(state, State.CLOSED))
458                         continue loop;
459                     onCompleteFailure(new ClosedChannelException());
460                     break loop;
461                 }
462             }
463         }
464     }
465
466     /*
467      * only for testing
468      * @return whether this callback is idle and {@link #iterate()} needs to be called
469      */
470     boolean isIdle()
471     {
472         return _state.get() == State.IDLE;
473     }
474
475     public boolean isClosed()
476     {
477         return _state.get() == State.CLOSED;
478     }
479     
480     /**
481      * @return whether this callback has failed
482      */
483     public boolean isFailed()
484     {
485         return _state.get() == State.FAILED;
486     }
487
488     /**
489      * @return whether this callback has succeeded
490      */
491     public boolean isSucceeded()
492     {
493         return _state.get() == State.SUCCEEDED;
494     }
495
496     /**
497      * Resets this callback.
498      * <p/>
499      * A callback can only be reset to IDLE from the
500      * SUCCEEDED or FAILED states or if it is already IDLE.
501      *
502      * @return true if the reset was successful
503      */
504     public boolean reset()
505     {
506         while (true)
507         {
508             State state=_state.get();
509             switch(state)
510             {
511                 case IDLE:
512                     return true;
513                     
514                 case SUCCEEDED:
515                     if (!_state.compareAndSet(state, State.LOCKED))
516                         continue;
517                     _iterate=false;
518                     _state.set(State.IDLE);
519                     return true;
520                     
521                 case FAILED:
522                     if (!_state.compareAndSet(state, State.LOCKED))
523                         continue;
524                     _iterate=false;
525                     _state.set(State.IDLE);
526                     return true;
527
528                 case LOCKED:
529                     Thread.yield();
530                     continue;
531                     
532                 default:
533                     return false;
534             }
535         }
536     }
537     
538     @Override
539     public String toString()
540     {
541         return String.format("%s[%s]", super.toString(), _state);
542     }
543 }