2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.util;
21 import java.util.concurrent.atomic.AtomicReference;
24 * This specialized callback implements a pattern that allows
25 * a large job to be broken into smaller tasks using iteration
26 * rather than recursion.
28 * A typical example is the write of a large content to a socket,
29 * divided in chunks. Chunk C1 is written by thread T1, which
30 * also invokes the callback, which writes chunk C2, which invokes
31 * the callback again, which writes chunk C3, and so forth.
33 * The problem with the example is that if the callback thread
34 * is the same that performs the I/O operation, then the process
35 * is recursive and may result in a stack overflow.
36 * To avoid the stack overflow, a thread dispatch must be performed,
37 * causing context switching and cache misses, affecting performance.
39 * To avoid this issue, this callback uses an AtomicReference to
40 * record whether success callback has been called during the processing
41 * of a sub task, and if so then the processing iterates rather than
44 * Subclasses must implement method {@link #process()} where the sub
45 * task is executed and a suitable {@link IteratingCallback.Action} is
46 * returned to this callback to indicate the overall progress of the job.
47 * This callback is passed to the asynchronous execution of each sub
48 * task and a call the {@link #succeeded()} on this callback represents
49 * the completion of the sub task.
51 public abstract class IteratingCallback implements Callback
54 * The indication of the overall progress of the overall job that
55 * implementations of {@link #process()} must return.
60 * Indicates that {@link #process()} has no more work to do,
61 * but the overall job is not completed yet, probably waiting
62 * for additional events to trigger more work.
66 * Indicates that {@link #process()} is executing asynchronously
67 * a sub task, where the execution has started but the callback
68 * may have not yet been invoked.
72 * Indicates that {@link #process()} has completed the overall job.
76 * Indicates that {@link #process()} has failed the overall job.
81 private final AtomicReference<State> _state = new AtomicReference<>(State.INACTIVE);
84 * Method called by {@link #iterate()} to process the sub task.
86 * Implementations must start the asynchronous execution of the sub task
87 * (if any) and return an appropriate action:
89 * <li>{@link Action#IDLE} when no sub tasks are available for execution
90 * but the overall job is not completed yet</li>
91 * <li>{@link Action#SCHEDULED} when the sub task asynchronous execution
92 * has been started</li>
93 * <li>{@link Action#SUCCEEDED} when the overall job is completed</li>
94 * <li>{@link Action#FAILED} when the overall job cannot be completed</li>
97 * @throws Exception if the sub task processing throws
99 protected abstract Action process() throws Exception;
102 * Invoked when the overall task has completed successfully.
104 protected abstract void completed();
107 * This method must be invoked by applications to start the processing
110 * If {@link #process()} returns {@link Action#IDLE}, then this method
111 * should be called again to restart processing.
112 * It is safe to call iterate multiple times from multiple threads since only
113 * the first thread to move the state out of INACTIVE will actually do any iteration
116 public void iterate()
122 switch (_state.get())
126 if (processIterations())
132 if (_state.compareAndSet(State.ITERATING, State.ITERATE_AGAIN))
149 private boolean processIterations() throws Exception
151 // Keeps iterating as long as succeeded() is called during process().
152 // If we are in INACTIVE state, either this is the first iteration or
153 // succeeded()/failed() were called already.
154 while (_state.compareAndSet(State.INACTIVE, State.ITERATING))
156 // Method process() can only be called by one thread at a time because
157 // it is guarded by the CaS above. However, the case blocks below may
158 // be executed concurrently in this case: T1 calls process() which
159 // executes the asynchronous sub task, which calls succeeded(), which
160 // moves the state into INACTIVE, then returns SCHEDULED; T2 calls
161 // iterate(), state is now INACTIVE and process() is called again and
162 // returns another action. Now we have 2 threads that may execute the
163 // action case blocks below concurrently; therefore each case block
164 // has to be prepared to fail the CaS it's doing.
166 Action action = process();
171 // No more progress can be made.
172 if (_state.compareAndSet(State.ITERATING, State.INACTIVE))
175 // Was iterate() called again since we already decided to go INACTIVE ?
176 // If so, try another iteration as more work may have been added
177 // while the previous call to process() was returning.
178 if (_state.compareAndSet(State.ITERATE_AGAIN, State.INACTIVE))
181 // State may have changed concurrently, try again.
186 // The sub task is executing, and the callback for it may or
187 // may not have already been called yet, which we figure out below.
188 // Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE.
189 if (_state.compareAndSet(State.ITERATING, State.ACTIVE) ||
190 _state.compareAndSet(State.ITERATE_AGAIN, State.ACTIVE))
191 // Not called back yet, so wait.
193 // Call back must have happened, so iterate.
198 // The overall job has completed.
199 if (completeSuccess())
210 throw new IllegalStateException(toString());
218 * Invoked when the sub task succeeds.
219 * Subclasses that override this method must always remember to call
220 * {@code super.succeeded()}.
223 public void succeeded()
227 State current = _state.get();
233 if (_state.compareAndSet(current, State.INACTIVE))
239 // If we can move from ACTIVE to INACTIVE
240 // then we are responsible to call iterate().
241 if (_state.compareAndSet(current, State.INACTIVE))
243 // If we can't CaS, then failed() must have been
244 // called, and we just return.
249 // Support the case where the callback is scheduled
250 // externally without a call to iterate().
256 throw new IllegalStateException(toString());
263 * Invoked when the sub task fails.
264 * Subclasses that override this method must always remember to call
265 * {@code super.failed(Throwable)}.
268 public void failed(Throwable x)
273 private boolean completeSuccess()
277 State current = _state.get();
278 if (current == State.FAILED)
280 // Success arrived too late, sorry.
285 if (_state.compareAndSet(current, State.SUCCEEDED))
291 private void completeFailure()
295 State current = _state.get();
296 if (current == State.SUCCEEDED)
298 // Failed arrived too late, sorry.
303 if (_state.compareAndSet(current, State.FAILED))
310 * @return whether this callback is idle and {@link #iterate()} needs to be called
312 public boolean isIdle()
314 return _state.get() == State.INACTIVE;
318 * @return whether this callback has failed
320 public boolean isFailed()
322 return _state.get() == State.FAILED;
326 * @return whether this callback has succeeded
328 public boolean isSucceeded()
330 return _state.get() == State.SUCCEEDED;
334 public String toString()
336 return String.format("%s[%s]", super.toString(), _state);
340 * The internal states of this callback
345 * This callback is inactive, ready to iterate.
349 * This callback is iterating and {@link #process()} has scheduled an
350 * asynchronous operation by returning {@link Action#SCHEDULED}, but
351 * the operation is still undergoing.
355 * This callback is iterating and {@link #process()} has been called
356 * but not returned yet.
360 * While this callback was iterating, another request for iteration
361 * has been issued, so the iteration must continue even if a previous
362 * call to {@link #process()} returned {@link Action#IDLE}.
366 * The overall job has succeeded.
370 * The overall job has failed.