2 // ========================================================================
3 // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.util;
21 import java.nio.channels.ClosedChannelException;
22 import java.util.concurrent.atomic.AtomicReference;
25 * This specialized callback implements a pattern that allows
26 * a large job to be broken into smaller tasks using iteration
27 * rather than recursion.
29 * A typical example is the write of a large content to a socket,
30 * divided in chunks. Chunk C1 is written by thread T1, which
31 * also invokes the callback, which writes chunk C2, which invokes
32 * the callback again, which writes chunk C3, and so forth.
34 * The problem with the example is that if the callback thread
35 * is the same that performs the I/O operation, then the process
36 * is recursive and may result in a stack overflow.
37 * To avoid the stack overflow, a thread dispatch must be performed,
38 * causing context switching and cache misses, affecting performance.
40 * To avoid this issue, this callback uses an AtomicReference to
41 * record whether success callback has been called during the processing
42 * of a sub task, and if so then the processing iterates rather than
45 * Subclasses must implement method {@link #process()} where the sub
46 * task is executed and a suitable {@link IteratingCallback.Action} is
47 * returned to this callback to indicate the overall progress of the job.
48 * This callback is passed to the asynchronous execution of each sub
49 * task and a call the {@link #succeeded()} on this callback represents
50 * the completion of the sub task.
52 public abstract class IteratingCallback implements Callback
55 * The internal states of this callback
60 * This callback is IDLE, ready to iterate.
65 * This callback is iterating calls to {@link #process()} and is dealing with
66 * the returns. To get into processing state, it much of held the lock state
67 * and set iterating to true.
72 * Waiting for a schedule callback
77 * Called by a schedule callback
82 * The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return
83 * from {@link IteratingCallback#process()}
88 * The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Throwable)}
93 * This callback has been closed and cannot be reset.
98 * State is locked while leaving processing state to check the iterate boolean
104 * The indication of the overall progress of the overall job that
105 * implementations of {@link #process()} must return.
107 protected enum Action
110 * Indicates that {@link #process()} has no more work to do,
111 * but the overall job is not completed yet, probably waiting
112 * for additional events to trigger more work.
116 * Indicates that {@link #process()} is executing asynchronously
117 * a sub task, where the execution has started but the callback
118 * may have not yet been invoked.
123 * Indicates that {@link #process()} has completed the overall job.
128 private final AtomicReference<State> _state;
129 private boolean _iterate;
132 protected IteratingCallback()
134 _state = new AtomicReference<>(State.IDLE);
137 protected IteratingCallback(boolean needReset)
139 _state = new AtomicReference<>(needReset ? State.SUCCEEDED : State.IDLE);
143 * Method called by {@link #iterate()} to process the sub task.
145 * Implementations must start the asynchronous execution of the sub task
146 * (if any) and return an appropriate action:
148 * <li>{@link Action#IDLE} when no sub tasks are available for execution
149 * but the overall job is not completed yet</li>
150 * <li>{@link Action#SCHEDULED} when the sub task asynchronous execution
151 * has been started</li>
152 * <li>{@link Action#SUCCEEDED} when the overall job is completed</li>
155 * @throws Exception if the sub task processing throws
157 protected abstract Action process() throws Exception;
160 * @deprecated Use {@link #onCompleteSuccess()} instead.
163 protected void completed()
168 * Invoked when the overall task has completed successfully.
170 * @see #onCompleteFailure(Throwable)
172 protected void onCompleteSuccess()
178 * Invoked when the overall task has completed with a failure.
180 * @see #onCompleteSuccess()
182 protected void onCompleteFailure(Throwable x)
187 * This method must be invoked by applications to start the processing
188 * of sub tasks. It can be called at any time by any thread, and it's
189 * contract is that when called, then the {@link #process()} method will
190 * be called during or soon after, either by the calling thread or by
193 public void iterate()
197 State state=_state.get();
202 // process will be called when callback is handled
206 if (!_state.compareAndSet(state,State.PROCESSING))
212 if (!_state.compareAndSet(state,State.LOCKED))
214 // Tell the thread that is processing that it must iterate again
216 _state.set(State.PROCESSING);
229 throw new IllegalStateException("state="+state);
234 private void processing()
236 // This should only ever be called when in processing state, however a failed or close call
237 // may happen concurrently, so state is not assumed.
239 // While we are processing
240 processing: while (true)
242 // Call process to get the action that we have to take.
254 // loop until we have successfully acted on the action we have just received
257 // action handling needs to know the state
258 State state=_state.get();
269 if (!_state.compareAndSet(state,State.LOCKED))
272 // Has iterate been called while we were processing?
275 // yes, so skip idle and keep processing
277 _state.set(State.PROCESSING);
281 // No, so we can go idle
282 _state.set(State.IDLE);
288 if (!_state.compareAndSet(state, State.PENDING))
290 // we won the race against the callback, so the callback has to process and we can break processing
296 if (!_state.compareAndSet(state, State.LOCKED))
299 _state.set(State.SUCCEEDED);
305 throw new IllegalStateException("state="+state+" action="+action);
315 if (!_state.compareAndSet(state, State.PROCESSING))
317 // we lost the race, so we have to keep processing
322 throw new IllegalStateException("state="+state+" action="+action);
338 throw new IllegalStateException("state="+state+" action="+action);
345 * Invoked when the sub task succeeds.
346 * Subclasses that override this method must always remember to call
347 * {@code super.succeeded()}.
350 public void succeeded()
354 State state = _state.get();
359 if (!_state.compareAndSet(state, State.CALLED))
365 if (!_state.compareAndSet(state, State.PROCESSING))
383 throw new IllegalStateException("state="+state);
390 * Invoked when the sub task fails.
391 * Subclasses that override this method must always remember to call
392 * {@code super.failed(Throwable)}.
395 public void failed(Throwable x)
399 State state = _state.get();
419 if (!_state.compareAndSet(state, State.FAILED))
422 onCompleteFailure(x);
426 throw new IllegalStateException("state="+state);
435 State state = _state.get();
442 if (!_state.compareAndSet(state, State.CLOSED))
457 if (!_state.compareAndSet(state, State.CLOSED))
459 onCompleteFailure(new ClosedChannelException());
468 * @return whether this callback is idle and {@link #iterate()} needs to be called
472 return _state.get() == State.IDLE;
475 public boolean isClosed()
477 return _state.get() == State.CLOSED;
481 * @return whether this callback has failed
483 public boolean isFailed()
485 return _state.get() == State.FAILED;
489 * @return whether this callback has succeeded
491 public boolean isSucceeded()
493 return _state.get() == State.SUCCEEDED;
497 * Resets this callback.
499 * A callback can only be reset to IDLE from the
500 * SUCCEEDED or FAILED states or if it is already IDLE.
502 * @return true if the reset was successful
504 public boolean reset()
508 State state=_state.get();
515 if (!_state.compareAndSet(state, State.LOCKED))
518 _state.set(State.IDLE);
522 if (!_state.compareAndSet(state, State.LOCKED))
525 _state.set(State.IDLE);
539 public String toString()
541 return String.format("%s[%s]", super.toString(), _state);