]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/util/IteratingCallback.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / util / IteratingCallback.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.util;
20
21 import java.util.concurrent.atomic.AtomicReference;
22
23 /**
24  * This specialized callback implements a pattern that allows
25  * a large job to be broken into smaller tasks using iteration
26  * rather than recursion.
27  * <p/>
28  * A typical example is the write of a large content to a socket,
29  * divided in chunks. Chunk C1 is written by thread T1, which
30  * also invokes the callback, which writes chunk C2, which invokes
31  * the callback again, which writes chunk C3, and so forth.
32  * <p/>
33  * The problem with the example is that if the callback thread
34  * is the same that performs the I/O operation, then the process
35  * is recursive and may result in a stack overflow.
36  * To avoid the stack overflow, a thread dispatch must be performed,
37  * causing context switching and cache misses, affecting performance.
38  * <p/>
39  * To avoid this issue, this callback uses an AtomicReference to
40  * record whether success callback has been called during the processing
41  * of a sub task, and if so then the processing iterates rather than
42  * recurring.
43  * <p/>
44  * Subclasses must implement method {@link #process()} where the sub
45  * task is executed and a suitable {@link IteratingCallback.Action} is
46  * returned to this callback to indicate the overall progress of the job.
47  * This callback is passed to the asynchronous execution of each sub
48  * task and a call the {@link #succeeded()} on this callback represents
49  * the completion of the sub task.
50  */
51 public abstract class IteratingCallback implements Callback
52 {
53     /**
54      * The indication of the overall progress of the overall job that
55      * implementations of {@link #process()} must return.
56      */
57     protected enum Action
58     {
59         /**
60          * Indicates that {@link #process()} has no more work to do,
61          * but the overall job is not completed yet, probably waiting
62          * for additional events to trigger more work.
63          */
64         IDLE,
65         /**
66          * Indicates that {@link #process()} is executing asynchronously
67          * a sub task, where the execution has started but the callback
68          * may have not yet been invoked.
69          */
70         SCHEDULED,
71         /**
72          * Indicates that {@link #process()} has completed the overall job.
73          */
74         SUCCEEDED,
75         /**
76          * Indicates that {@link #process()} has failed the overall job.
77          */
78         FAILED
79     }
80
81     private final AtomicReference<State> _state = new AtomicReference<>(State.INACTIVE);
82
83     /**
84      * Method called by {@link #iterate()} to process the sub task.
85      * <p/>
86      * Implementations must start the asynchronous execution of the sub task
87      * (if any) and return an appropriate action:
88      * <ul>
89      * <li>{@link Action#IDLE} when no sub tasks are available for execution
90      * but the overall job is not completed yet</li>
91      * <li>{@link Action#SCHEDULED} when the sub task asynchronous execution
92      * has been started</li>
93      * <li>{@link Action#SUCCEEDED} when the overall job is completed</li>
94      * <li>{@link Action#FAILED} when the overall job cannot be completed</li>
95      * </ul>
96      *
97      * @throws Exception if the sub task processing throws
98      */
99     protected abstract Action process() throws Exception;
100
101     /**
102      * Invoked when the overall task has completed successfully.
103      */
104     protected abstract void completed();
105
106     /**
107      * This method must be invoked by applications to start the processing
108      * of sub tasks.
109      * <p/>
110      * If {@link #process()} returns {@link Action#IDLE}, then this method
111      * should be called again to restart processing.
112      * It is safe to call iterate multiple times from multiple threads since only
113      * the first thread to move the state out of INACTIVE will actually do any iteration
114      * and processing.
115      */
116     public void iterate()
117     {
118         try
119         {
120             while (true)
121             {
122                 switch (_state.get())
123                 {
124                     case INACTIVE:
125                     {
126                         if (processIterations())
127                             return;
128                         break;
129                     }
130                     case ITERATING:
131                     {
132                         if (_state.compareAndSet(State.ITERATING, State.ITERATE_AGAIN))
133                             return;
134                         break;
135                     }
136                     default:
137                     {
138                         return;
139                     }
140                 }
141             }
142         }
143         catch (Throwable x)
144         {
145             failed(x);
146         }
147     }
148
149     private boolean processIterations() throws Exception
150     {
151         // Keeps iterating as long as succeeded() is called during process().
152         // If we are in INACTIVE state, either this is the first iteration or
153         // succeeded()/failed() were called already.
154         while (_state.compareAndSet(State.INACTIVE, State.ITERATING))
155         {
156             // Method process() can only be called by one thread at a time because
157             // it is guarded by the CaS above. However, the case blocks below may
158             // be executed concurrently in this case: T1 calls process() which
159             // executes the asynchronous sub task, which calls succeeded(), which
160             // moves the state into INACTIVE, then returns SCHEDULED; T2 calls
161             // iterate(), state is now INACTIVE and process() is called again and
162             // returns another action. Now we have 2 threads that may execute the
163             // action case blocks below concurrently; therefore each case block
164             // has to be prepared to fail the CaS it's doing.
165
166             Action action = process();
167             switch (action)
168             {
169                 case IDLE:
170                 {
171                     // No more progress can be made.
172                     if (_state.compareAndSet(State.ITERATING, State.INACTIVE))
173                         return true;
174
175                     // Was iterate() called again since we already decided to go INACTIVE ?
176                     // If so, try another iteration as more work may have been added
177                     // while the previous call to process() was returning.
178                     if (_state.compareAndSet(State.ITERATE_AGAIN, State.INACTIVE))
179                         continue;
180
181                     // State may have changed concurrently, try again.
182                     continue;
183                 }
184                 case SCHEDULED:
185                 {
186                     // The sub task is executing, and the callback for it may or
187                     // may not have already been called yet, which we figure out below.
188                     // Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE.
189                     if (_state.compareAndSet(State.ITERATING, State.ACTIVE) ||
190                             _state.compareAndSet(State.ITERATE_AGAIN, State.ACTIVE))
191                         // Not called back yet, so wait.
192                         return true;
193                     // Call back must have happened, so iterate.
194                     continue;
195                 }
196                 case SUCCEEDED:
197                 {
198                     // The overall job has completed.
199                     if (completeSuccess())
200                         completed();
201                     return true;
202                 }
203                 case FAILED:
204                 {
205                     completeFailure();
206                     return true;
207                 }
208                 default:
209                 {
210                     throw new IllegalStateException(toString());
211                 }
212             }
213         }
214         return false;
215     }
216
217     /**
218      * Invoked when the sub task succeeds.
219      * Subclasses that override this method must always remember to call
220      * {@code super.succeeded()}.
221      */
222     @Override
223     public void succeeded()
224     {
225         while (true)
226         {
227             State current = _state.get();
228             switch (current)
229             {
230                 case ITERATE_AGAIN:
231                 case ITERATING:
232                 {
233                     if (_state.compareAndSet(current, State.INACTIVE))
234                         return;
235                     continue;
236                 }
237                 case ACTIVE:
238                 {
239                     // If we can move from ACTIVE to INACTIVE
240                     // then we are responsible to call iterate().
241                     if (_state.compareAndSet(current, State.INACTIVE))
242                         iterate();
243                     // If we can't CaS, then failed() must have been
244                     // called, and we just return.
245                     return;
246                 }
247                 case INACTIVE:
248                 {
249                     // Support the case where the callback is scheduled
250                     // externally without a call to iterate().
251                     iterate();
252                     return;
253                 }
254                 default:
255                 {
256                     throw new IllegalStateException(toString());
257                 }
258             }
259         }
260     }
261
262     /**
263      * Invoked when the sub task fails.
264      * Subclasses that override this method must always remember to call
265      * {@code super.failed(Throwable)}.
266      */
267     @Override
268     public void failed(Throwable x)
269     {
270         completeFailure();
271     }
272
273     private boolean completeSuccess()
274     {
275         while (true)
276         {
277             State current = _state.get();
278             if (current == State.FAILED)
279             {
280                 // Success arrived too late, sorry.
281                 return false;
282             }
283             else
284             {
285                 if (_state.compareAndSet(current, State.SUCCEEDED))
286                     return true;
287             }
288         }
289     }
290
291     private void completeFailure()
292     {
293         while (true)
294         {
295             State current = _state.get();
296             if (current == State.SUCCEEDED)
297             {
298                 // Failed arrived too late, sorry.
299                 return;
300             }
301             else
302             {
303                 if (_state.compareAndSet(current, State.FAILED))
304                     break;
305             }
306         }
307     }
308
309     /**
310      * @return whether this callback is idle and {@link #iterate()} needs to be called
311      */
312     public boolean isIdle()
313     {
314         return _state.get() == State.INACTIVE;
315     }
316
317     /**
318      * @return whether this callback has failed
319      */
320     public boolean isFailed()
321     {
322         return _state.get() == State.FAILED;
323     }
324
325     /**
326      * @return whether this callback has succeeded
327      */
328     public boolean isSucceeded()
329     {
330         return _state.get() == State.SUCCEEDED;
331     }
332
333     @Override
334     public String toString()
335     {
336         return String.format("%s[%s]", super.toString(), _state);
337     }
338
339     /**
340      * The internal states of this callback
341      */
342     private enum State
343     {
344         /**
345          * This callback is inactive, ready to iterate.
346          */
347         INACTIVE,
348         /**
349          * This callback is iterating and {@link #process()} has scheduled an
350          * asynchronous operation by returning {@link Action#SCHEDULED}, but
351          * the operation is still undergoing.
352          */
353         ACTIVE,
354         /**
355          * This callback is iterating and {@link #process()} has been called
356          * but not returned yet.
357          */
358         ITERATING,
359         /**
360          * While this callback was iterating, another request for iteration
361          * has been issued, so the iteration must continue even if a previous
362          * call to {@link #process()} returned {@link Action#IDLE}.
363          */
364         ITERATE_AGAIN,
365         /**
366          * The overall job has succeeded.
367          */
368         SUCCEEDED,
369         /**
370          * The overall job has failed.
371          */
372         FAILED
373     }
374 }