//
// ========================================================================
-// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
package org.eclipse.jetty.util;
+import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public abstract class IteratingCallback implements Callback
{
+ /**
+ * The internal states of this callback
+ */
+ private enum State
+ {
+ /**
+ * This callback is IDLE, ready to iterate.
+ */
+ IDLE,
+
+ /**
+ * This callback is iterating calls to {@link #process()} and is dealing with
+ * the returns. To get into processing state, it much of held the lock state
+ * and set iterating to true.
+ */
+ PROCESSING,
+
+ /**
+ * Waiting for a schedule callback
+ */
+ PENDING,
+
+ /**
+ * Called by a schedule callback
+ */
+ CALLED,
+
+ /**
+ * The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return
+ * from {@link IteratingCallback#process()}
+ */
+ SUCCEEDED,
+
+ /**
+ * The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Throwable)}
+ */
+ FAILED,
+
+ /**
+ * This callback has been closed and cannot be reset.
+ */
+ CLOSED,
+
+ /**
+ * State is locked while leaving processing state to check the iterate boolean
+ */
+ LOCKED
+ }
+
/**
* The indication of the overall progress of the overall job that
* implementations of {@link #process()} must return.
* may have not yet been invoked.
*/
SCHEDULED,
+
/**
* Indicates that {@link #process()} has completed the overall job.
*/
- SUCCEEDED,
- /**
- * Indicates that {@link #process()} has failed the overall job.
- */
- FAILED
+ SUCCEEDED
}
- private final AtomicReference<State> _state = new AtomicReference<>(State.INACTIVE);
-
+ private final AtomicReference<State> _state;
+ private boolean _iterate;
+
+
+ protected IteratingCallback()
+ {
+ _state = new AtomicReference<>(State.IDLE);
+ }
+
+ protected IteratingCallback(boolean needReset)
+ {
+ _state = new AtomicReference<>(needReset ? State.SUCCEEDED : State.IDLE);
+ }
+
/**
* Method called by {@link #iterate()} to process the sub task.
* <p/>
* <li>{@link Action#SCHEDULED} when the sub task asynchronous execution
* has been started</li>
* <li>{@link Action#SUCCEEDED} when the overall job is completed</li>
- * <li>{@link Action#FAILED} when the overall job cannot be completed</li>
* </ul>
*
* @throws Exception if the sub task processing throws
*/
protected abstract Action process() throws Exception;
+ /**
+ * @deprecated Use {@link #onCompleteSuccess()} instead.
+ */
+ @Deprecated
+ protected void completed()
+ {
+ }
+
/**
* Invoked when the overall task has completed successfully.
+ *
+ * @see #onCompleteFailure(Throwable)
*/
- protected abstract void completed();
+ protected void onCompleteSuccess()
+ {
+ completed();
+ }
+
+ /**
+ * Invoked when the overall task has completed with a failure.
+ *
+ * @see #onCompleteSuccess()
+ */
+ protected void onCompleteFailure(Throwable x)
+ {
+ }
/**
* This method must be invoked by applications to start the processing
- * of sub tasks.
- * <p/>
- * If {@link #process()} returns {@link Action#IDLE}, then this method
- * should be called again to restart processing.
- * It is safe to call iterate multiple times from multiple threads since only
- * the first thread to move the state out of INACTIVE will actually do any iteration
- * and processing.
+ * of sub tasks. It can be called at any time by any thread, and it's
+ * contract is that when called, then the {@link #process()} method will
+ * be called during or soon after, either by the calling thread or by
+ * another thread.
*/
public void iterate()
{
- try
+ loop: while (true)
{
- while (true)
+ State state=_state.get();
+ switch (state)
{
- switch (_state.get())
- {
- case INACTIVE:
- {
- if (processIterations())
- return;
- break;
- }
- case ITERATING:
- {
- if (_state.compareAndSet(State.ITERATING, State.ITERATE_AGAIN))
- return;
- break;
- }
- default:
- {
- return;
- }
- }
+ case PENDING:
+ case CALLED:
+ // process will be called when callback is handled
+ break loop;
+
+ case IDLE:
+ if (!_state.compareAndSet(state,State.PROCESSING))
+ continue;
+ processing();
+ break loop;
+
+ case PROCESSING:
+ if (!_state.compareAndSet(state,State.LOCKED))
+ continue;
+ // Tell the thread that is processing that it must iterate again
+ _iterate=true;
+ _state.set(State.PROCESSING);
+ break loop;
+
+ case LOCKED:
+ Thread.yield();
+ continue loop;
+
+ case FAILED:
+ case SUCCEEDED:
+ break loop;
+
+ case CLOSED:
+ default:
+ throw new IllegalStateException("state="+state);
}
}
- catch (Throwable x)
- {
- failed(x);
- }
}
- private boolean processIterations() throws Exception
+ private void processing()
{
- // Keeps iterating as long as succeeded() is called during process().
- // If we are in INACTIVE state, either this is the first iteration or
- // succeeded()/failed() were called already.
- while (_state.compareAndSet(State.INACTIVE, State.ITERATING))
+ // This should only ever be called when in processing state, however a failed or close call
+ // may happen concurrently, so state is not assumed.
+
+ // While we are processing
+ processing: while (true)
{
- // Method process() can only be called by one thread at a time because
- // it is guarded by the CaS above. However, the case blocks below may
- // be executed concurrently in this case: T1 calls process() which
- // executes the asynchronous sub task, which calls succeeded(), which
- // moves the state into INACTIVE, then returns SCHEDULED; T2 calls
- // iterate(), state is now INACTIVE and process() is called again and
- // returns another action. Now we have 2 threads that may execute the
- // action case blocks below concurrently; therefore each case block
- // has to be prepared to fail the CaS it's doing.
+ // Call process to get the action that we have to take.
+ Action action;
+ try
+ {
+ action = process();
+ }
+ catch (Throwable x)
+ {
+ failed(x);
+ break processing;
+ }
- Action action = process();
- switch (action)
+ // loop until we have successfully acted on the action we have just received
+ acting: while(true)
{
- case IDLE:
+ // action handling needs to know the state
+ State state=_state.get();
+
+ switch (state)
{
- // No more progress can be made.
- if (_state.compareAndSet(State.ITERATING, State.INACTIVE))
- return true;
+ case PROCESSING:
+ {
+ switch (action)
+ {
+ case IDLE:
+ {
+ // lock the state
+ if (!_state.compareAndSet(state,State.LOCKED))
+ continue acting;
- // Was iterate() called again since we already decided to go INACTIVE ?
- // If so, try another iteration as more work may have been added
- // while the previous call to process() was returning.
- if (_state.compareAndSet(State.ITERATE_AGAIN, State.INACTIVE))
- continue;
+ // Has iterate been called while we were processing?
+ if (_iterate)
+ {
+ // yes, so skip idle and keep processing
+ _iterate=false;
+ _state.set(State.PROCESSING);
+ continue processing;
+ }
- // State may have changed concurrently, try again.
- continue;
- }
- case SCHEDULED:
- {
- // The sub task is executing, and the callback for it may or
- // may not have already been called yet, which we figure out below.
- // Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE.
- if (_state.compareAndSet(State.ITERATING, State.ACTIVE) ||
- _state.compareAndSet(State.ITERATE_AGAIN, State.ACTIVE))
- // Not called back yet, so wait.
- return true;
- // Call back must have happened, so iterate.
- continue;
- }
- case SUCCEEDED:
- {
- // The overall job has completed.
- if (completeSuccess())
- completed();
- return true;
- }
- case FAILED:
- {
- completeFailure();
- return true;
- }
- default:
- {
- throw new IllegalStateException(toString());
+ // No, so we can go idle
+ _state.set(State.IDLE);
+ break processing;
+ }
+
+ case SCHEDULED:
+ {
+ if (!_state.compareAndSet(state, State.PENDING))
+ continue acting;
+ // we won the race against the callback, so the callback has to process and we can break processing
+ break processing;
+ }
+
+ case SUCCEEDED:
+ {
+ if (!_state.compareAndSet(state, State.LOCKED))
+ continue acting;
+ _iterate=false;
+ _state.set(State.SUCCEEDED);
+ onCompleteSuccess();
+ break processing;
+ }
+
+ default:
+ throw new IllegalStateException("state="+state+" action="+action);
+ }
+ }
+
+ case CALLED:
+ {
+ switch (action)
+ {
+ case SCHEDULED:
+ {
+ if (!_state.compareAndSet(state, State.PROCESSING))
+ continue acting;
+ // we lost the race, so we have to keep processing
+ continue processing;
+ }
+
+ default:
+ throw new IllegalStateException("state="+state+" action="+action);
+ }
+ }
+
+ case LOCKED:
+ Thread.yield();
+ continue acting;
+
+ case SUCCEEDED:
+ case FAILED:
+ case CLOSED:
+ break processing;
+
+ case IDLE:
+ case PENDING:
+ default:
+ throw new IllegalStateException("state="+state+" action="+action);
}
}
}
- return false;
}
-
+
/**
* Invoked when the sub task succeeds.
* Subclasses that override this method must always remember to call
@Override
public void succeeded()
{
- while (true)
+ loop: while (true)
{
- State current = _state.get();
- switch (current)
+ State state = _state.get();
+ switch (state)
{
- case ITERATE_AGAIN:
- case ITERATING:
+ case PROCESSING:
{
- if (_state.compareAndSet(current, State.INACTIVE))
- return;
- continue;
+ if (!_state.compareAndSet(state, State.CALLED))
+ continue loop;
+ break loop;
}
- case ACTIVE:
+ case PENDING:
{
- // If we can move from ACTIVE to INACTIVE
- // then we are responsible to call iterate().
- if (_state.compareAndSet(current, State.INACTIVE))
- iterate();
- // If we can't CaS, then failed() must have been
- // called, and we just return.
- return;
+ if (!_state.compareAndSet(state, State.PROCESSING))
+ continue loop;
+ processing();
+ break loop;
}
- case INACTIVE:
+ case CLOSED:
+ case FAILED:
{
- // Support the case where the callback is scheduled
- // externally without a call to iterate().
- iterate();
- return;
+ // Too late!
+ break loop;
}
+ case LOCKED:
+ {
+ Thread.yield();
+ continue loop;
+ }
default:
{
- throw new IllegalStateException(toString());
+ throw new IllegalStateException("state="+state);
}
}
}
@Override
public void failed(Throwable x)
{
- completeFailure();
- }
-
- private boolean completeSuccess()
- {
- while (true)
+ loop: while (true)
{
- State current = _state.get();
- if (current == State.FAILED)
+ State state = _state.get();
+ switch (state)
{
- // Success arrived too late, sorry.
- return false;
- }
- else
- {
- if (_state.compareAndSet(current, State.SUCCEEDED))
- return true;
+ case SUCCEEDED:
+ case FAILED:
+ case IDLE:
+ case CLOSED:
+ case CALLED:
+ {
+ // too late!.
+ break loop;
+ }
+ case LOCKED:
+ {
+ Thread.yield();
+ continue loop;
+ }
+ case PENDING:
+ case PROCESSING:
+ {
+ if (!_state.compareAndSet(state, State.FAILED))
+ continue loop;
+
+ onCompleteFailure(x);
+ break loop;
+ }
+ default:
+ throw new IllegalStateException("state="+state);
}
}
}
- private void completeFailure()
+ public void close()
{
- while (true)
+ loop: while (true)
{
- State current = _state.get();
- if (current == State.SUCCEEDED)
+ State state = _state.get();
+ switch (state)
{
- // Failed arrived too late, sorry.
- return;
- }
- else
- {
- if (_state.compareAndSet(current, State.FAILED))
- break;
+ case IDLE:
+ case SUCCEEDED:
+ case FAILED:
+ {
+ if (!_state.compareAndSet(state, State.CLOSED))
+ continue loop;
+ break loop;
+ }
+ case CLOSED:
+ {
+ break loop;
+ }
+ case LOCKED:
+ {
+ Thread.yield();
+ continue loop;
+ }
+ default:
+ {
+ if (!_state.compareAndSet(state, State.CLOSED))
+ continue loop;
+ onCompleteFailure(new ClosedChannelException());
+ break loop;
+ }
}
}
}
- /**
+ /*
+ * only for testing
* @return whether this callback is idle and {@link #iterate()} needs to be called
*/
- public boolean isIdle()
+ boolean isIdle()
{
- return _state.get() == State.INACTIVE;
+ return _state.get() == State.IDLE;
}
+ public boolean isClosed()
+ {
+ return _state.get() == State.CLOSED;
+ }
+
/**
* @return whether this callback has failed
*/
return _state.get() == State.SUCCEEDED;
}
+ /**
+ * Resets this callback.
+ * <p/>
+ * A callback can only be reset to IDLE from the
+ * SUCCEEDED or FAILED states or if it is already IDLE.
+ *
+ * @return true if the reset was successful
+ */
+ public boolean reset()
+ {
+ while (true)
+ {
+ State state=_state.get();
+ switch(state)
+ {
+ case IDLE:
+ return true;
+
+ case SUCCEEDED:
+ if (!_state.compareAndSet(state, State.LOCKED))
+ continue;
+ _iterate=false;
+ _state.set(State.IDLE);
+ return true;
+
+ case FAILED:
+ if (!_state.compareAndSet(state, State.LOCKED))
+ continue;
+ _iterate=false;
+ _state.set(State.IDLE);
+ return true;
+
+ case LOCKED:
+ Thread.yield();
+ continue;
+
+ default:
+ return false;
+ }
+ }
+ }
+
@Override
public String toString()
{
return String.format("%s[%s]", super.toString(), _state);
}
-
- /**
- * The internal states of this callback
- */
- private enum State
- {
- /**
- * This callback is inactive, ready to iterate.
- */
- INACTIVE,
- /**
- * This callback is iterating and {@link #process()} has scheduled an
- * asynchronous operation by returning {@link Action#SCHEDULED}, but
- * the operation is still undergoing.
- */
- ACTIVE,
- /**
- * This callback is iterating and {@link #process()} has been called
- * but not returned yet.
- */
- ITERATING,
- /**
- * While this callback was iterating, another request for iteration
- * has been issued, so the iteration must continue even if a previous
- * call to {@link #process()} returned {@link Action#IDLE}.
- */
- ITERATE_AGAIN,
- /**
- * The overall job has succeeded.
- */
- SUCCEEDED,
- /**
- * The overall job has failed.
- */
- FAILED
- }
}