X-Git-Url: https://code.wpia.club/?p=gigi.git;a=blobdiff_plain;f=lib%2Fjetty%2Forg%2Feclipse%2Fjetty%2Futil%2FIteratingCallback.java;fp=lib%2Fjetty%2Forg%2Feclipse%2Fjetty%2Futil%2FIteratingCallback.java;h=8d24dd9d11cd498f192454f7bf97fc1cf58fec64;hp=1c74d5eaf6535e9d15503686c63ce07cc71dc126;hb=065ca60170f2471227dc25784e1a4c3b7912d367;hpb=ad7a401ad98da5a8a33e60d39789e941aa8ccfc4 diff --git a/lib/jetty/org/eclipse/jetty/util/IteratingCallback.java b/lib/jetty/org/eclipse/jetty/util/IteratingCallback.java index 1c74d5ea..8d24dd9d 100644 --- a/lib/jetty/org/eclipse/jetty/util/IteratingCallback.java +++ b/lib/jetty/org/eclipse/jetty/util/IteratingCallback.java @@ -1,6 +1,6 @@ // // ======================================================================== -// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. // ------------------------------------------------------------------------ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 @@ -18,6 +18,7 @@ package org.eclipse.jetty.util; +import java.nio.channels.ClosedChannelException; import java.util.concurrent.atomic.AtomicReference; /** @@ -50,6 +51,55 @@ import java.util.concurrent.atomic.AtomicReference; */ public abstract class IteratingCallback implements Callback { + /** + * The internal states of this callback + */ + private enum State + { + /** + * This callback is IDLE, ready to iterate. + */ + IDLE, + + /** + * This callback is iterating calls to {@link #process()} and is dealing with + * the returns. To get into processing state, it much of held the lock state + * and set iterating to true. + */ + PROCESSING, + + /** + * Waiting for a schedule callback + */ + PENDING, + + /** + * Called by a schedule callback + */ + CALLED, + + /** + * The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return + * from {@link IteratingCallback#process()} + */ + SUCCEEDED, + + /** + * The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Throwable)} + */ + FAILED, + + /** + * This callback has been closed and cannot be reset. + */ + CLOSED, + + /** + * State is locked while leaving processing state to check the iterate boolean + */ + LOCKED + } + /** * The indication of the overall progress of the overall job that * implementations of {@link #process()} must return. @@ -68,18 +118,27 @@ public abstract class IteratingCallback implements Callback * may have not yet been invoked. */ SCHEDULED, + /** * Indicates that {@link #process()} has completed the overall job. */ - SUCCEEDED, - /** - * Indicates that {@link #process()} has failed the overall job. - */ - FAILED + SUCCEEDED } - private final AtomicReference _state = new AtomicReference<>(State.INACTIVE); - + private final AtomicReference _state; + private boolean _iterate; + + + protected IteratingCallback() + { + _state = new AtomicReference<>(State.IDLE); + } + + protected IteratingCallback(boolean needReset) + { + _state = new AtomicReference<>(needReset ? State.SUCCEEDED : State.IDLE); + } + /** * Method called by {@link #iterate()} to process the sub task. *

@@ -91,129 +150,197 @@ public abstract class IteratingCallback implements Callback *

  • {@link Action#SCHEDULED} when the sub task asynchronous execution * has been started
  • *
  • {@link Action#SUCCEEDED} when the overall job is completed
  • - *
  • {@link Action#FAILED} when the overall job cannot be completed
  • * * * @throws Exception if the sub task processing throws */ protected abstract Action process() throws Exception; + /** + * @deprecated Use {@link #onCompleteSuccess()} instead. + */ + @Deprecated + protected void completed() + { + } + /** * Invoked when the overall task has completed successfully. + * + * @see #onCompleteFailure(Throwable) */ - protected abstract void completed(); + protected void onCompleteSuccess() + { + completed(); + } + + /** + * Invoked when the overall task has completed with a failure. + * + * @see #onCompleteSuccess() + */ + protected void onCompleteFailure(Throwable x) + { + } /** * This method must be invoked by applications to start the processing - * of sub tasks. - *

    - * If {@link #process()} returns {@link Action#IDLE}, then this method - * should be called again to restart processing. - * It is safe to call iterate multiple times from multiple threads since only - * the first thread to move the state out of INACTIVE will actually do any iteration - * and processing. + * of sub tasks. It can be called at any time by any thread, and it's + * contract is that when called, then the {@link #process()} method will + * be called during or soon after, either by the calling thread or by + * another thread. */ public void iterate() { - try + loop: while (true) { - while (true) + State state=_state.get(); + switch (state) { - switch (_state.get()) - { - case INACTIVE: - { - if (processIterations()) - return; - break; - } - case ITERATING: - { - if (_state.compareAndSet(State.ITERATING, State.ITERATE_AGAIN)) - return; - break; - } - default: - { - return; - } - } + case PENDING: + case CALLED: + // process will be called when callback is handled + break loop; + + case IDLE: + if (!_state.compareAndSet(state,State.PROCESSING)) + continue; + processing(); + break loop; + + case PROCESSING: + if (!_state.compareAndSet(state,State.LOCKED)) + continue; + // Tell the thread that is processing that it must iterate again + _iterate=true; + _state.set(State.PROCESSING); + break loop; + + case LOCKED: + Thread.yield(); + continue loop; + + case FAILED: + case SUCCEEDED: + break loop; + + case CLOSED: + default: + throw new IllegalStateException("state="+state); } } - catch (Throwable x) - { - failed(x); - } } - private boolean processIterations() throws Exception + private void processing() { - // Keeps iterating as long as succeeded() is called during process(). - // If we are in INACTIVE state, either this is the first iteration or - // succeeded()/failed() were called already. - while (_state.compareAndSet(State.INACTIVE, State.ITERATING)) + // This should only ever be called when in processing state, however a failed or close call + // may happen concurrently, so state is not assumed. + + // While we are processing + processing: while (true) { - // Method process() can only be called by one thread at a time because - // it is guarded by the CaS above. However, the case blocks below may - // be executed concurrently in this case: T1 calls process() which - // executes the asynchronous sub task, which calls succeeded(), which - // moves the state into INACTIVE, then returns SCHEDULED; T2 calls - // iterate(), state is now INACTIVE and process() is called again and - // returns another action. Now we have 2 threads that may execute the - // action case blocks below concurrently; therefore each case block - // has to be prepared to fail the CaS it's doing. + // Call process to get the action that we have to take. + Action action; + try + { + action = process(); + } + catch (Throwable x) + { + failed(x); + break processing; + } - Action action = process(); - switch (action) + // loop until we have successfully acted on the action we have just received + acting: while(true) { - case IDLE: + // action handling needs to know the state + State state=_state.get(); + + switch (state) { - // No more progress can be made. - if (_state.compareAndSet(State.ITERATING, State.INACTIVE)) - return true; + case PROCESSING: + { + switch (action) + { + case IDLE: + { + // lock the state + if (!_state.compareAndSet(state,State.LOCKED)) + continue acting; - // Was iterate() called again since we already decided to go INACTIVE ? - // If so, try another iteration as more work may have been added - // while the previous call to process() was returning. - if (_state.compareAndSet(State.ITERATE_AGAIN, State.INACTIVE)) - continue; + // Has iterate been called while we were processing? + if (_iterate) + { + // yes, so skip idle and keep processing + _iterate=false; + _state.set(State.PROCESSING); + continue processing; + } - // State may have changed concurrently, try again. - continue; - } - case SCHEDULED: - { - // The sub task is executing, and the callback for it may or - // may not have already been called yet, which we figure out below. - // Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE. - if (_state.compareAndSet(State.ITERATING, State.ACTIVE) || - _state.compareAndSet(State.ITERATE_AGAIN, State.ACTIVE)) - // Not called back yet, so wait. - return true; - // Call back must have happened, so iterate. - continue; - } - case SUCCEEDED: - { - // The overall job has completed. - if (completeSuccess()) - completed(); - return true; - } - case FAILED: - { - completeFailure(); - return true; - } - default: - { - throw new IllegalStateException(toString()); + // No, so we can go idle + _state.set(State.IDLE); + break processing; + } + + case SCHEDULED: + { + if (!_state.compareAndSet(state, State.PENDING)) + continue acting; + // we won the race against the callback, so the callback has to process and we can break processing + break processing; + } + + case SUCCEEDED: + { + if (!_state.compareAndSet(state, State.LOCKED)) + continue acting; + _iterate=false; + _state.set(State.SUCCEEDED); + onCompleteSuccess(); + break processing; + } + + default: + throw new IllegalStateException("state="+state+" action="+action); + } + } + + case CALLED: + { + switch (action) + { + case SCHEDULED: + { + if (!_state.compareAndSet(state, State.PROCESSING)) + continue acting; + // we lost the race, so we have to keep processing + continue processing; + } + + default: + throw new IllegalStateException("state="+state+" action="+action); + } + } + + case LOCKED: + Thread.yield(); + continue acting; + + case SUCCEEDED: + case FAILED: + case CLOSED: + break processing; + + case IDLE: + case PENDING: + default: + throw new IllegalStateException("state="+state+" action="+action); } } } - return false; } - + /** * Invoked when the sub task succeeds. * Subclasses that override this method must always remember to call @@ -222,38 +349,38 @@ public abstract class IteratingCallback implements Callback @Override public void succeeded() { - while (true) + loop: while (true) { - State current = _state.get(); - switch (current) + State state = _state.get(); + switch (state) { - case ITERATE_AGAIN: - case ITERATING: + case PROCESSING: { - if (_state.compareAndSet(current, State.INACTIVE)) - return; - continue; + if (!_state.compareAndSet(state, State.CALLED)) + continue loop; + break loop; } - case ACTIVE: + case PENDING: { - // If we can move from ACTIVE to INACTIVE - // then we are responsible to call iterate(). - if (_state.compareAndSet(current, State.INACTIVE)) - iterate(); - // If we can't CaS, then failed() must have been - // called, and we just return. - return; + if (!_state.compareAndSet(state, State.PROCESSING)) + continue loop; + processing(); + break loop; } - case INACTIVE: + case CLOSED: + case FAILED: { - // Support the case where the callback is scheduled - // externally without a call to iterate(). - iterate(); - return; + // Too late! + break loop; } + case LOCKED: + { + Thread.yield(); + continue loop; + } default: { - throw new IllegalStateException(toString()); + throw new IllegalStateException("state="+state); } } } @@ -267,53 +394,89 @@ public abstract class IteratingCallback implements Callback @Override public void failed(Throwable x) { - completeFailure(); - } - - private boolean completeSuccess() - { - while (true) + loop: while (true) { - State current = _state.get(); - if (current == State.FAILED) + State state = _state.get(); + switch (state) { - // Success arrived too late, sorry. - return false; - } - else - { - if (_state.compareAndSet(current, State.SUCCEEDED)) - return true; + case SUCCEEDED: + case FAILED: + case IDLE: + case CLOSED: + case CALLED: + { + // too late!. + break loop; + } + case LOCKED: + { + Thread.yield(); + continue loop; + } + case PENDING: + case PROCESSING: + { + if (!_state.compareAndSet(state, State.FAILED)) + continue loop; + + onCompleteFailure(x); + break loop; + } + default: + throw new IllegalStateException("state="+state); } } } - private void completeFailure() + public void close() { - while (true) + loop: while (true) { - State current = _state.get(); - if (current == State.SUCCEEDED) + State state = _state.get(); + switch (state) { - // Failed arrived too late, sorry. - return; - } - else - { - if (_state.compareAndSet(current, State.FAILED)) - break; + case IDLE: + case SUCCEEDED: + case FAILED: + { + if (!_state.compareAndSet(state, State.CLOSED)) + continue loop; + break loop; + } + case CLOSED: + { + break loop; + } + case LOCKED: + { + Thread.yield(); + continue loop; + } + default: + { + if (!_state.compareAndSet(state, State.CLOSED)) + continue loop; + onCompleteFailure(new ClosedChannelException()); + break loop; + } } } } - /** + /* + * only for testing * @return whether this callback is idle and {@link #iterate()} needs to be called */ - public boolean isIdle() + boolean isIdle() { - return _state.get() == State.INACTIVE; + return _state.get() == State.IDLE; } + public boolean isClosed() + { + return _state.get() == State.CLOSED; + } + /** * @return whether this callback has failed */ @@ -330,45 +493,51 @@ public abstract class IteratingCallback implements Callback return _state.get() == State.SUCCEEDED; } + /** + * Resets this callback. + *

    + * A callback can only be reset to IDLE from the + * SUCCEEDED or FAILED states or if it is already IDLE. + * + * @return true if the reset was successful + */ + public boolean reset() + { + while (true) + { + State state=_state.get(); + switch(state) + { + case IDLE: + return true; + + case SUCCEEDED: + if (!_state.compareAndSet(state, State.LOCKED)) + continue; + _iterate=false; + _state.set(State.IDLE); + return true; + + case FAILED: + if (!_state.compareAndSet(state, State.LOCKED)) + continue; + _iterate=false; + _state.set(State.IDLE); + return true; + + case LOCKED: + Thread.yield(); + continue; + + default: + return false; + } + } + } + @Override public String toString() { return String.format("%s[%s]", super.toString(), _state); } - - /** - * The internal states of this callback - */ - private enum State - { - /** - * This callback is inactive, ready to iterate. - */ - INACTIVE, - /** - * This callback is iterating and {@link #process()} has scheduled an - * asynchronous operation by returning {@link Action#SCHEDULED}, but - * the operation is still undergoing. - */ - ACTIVE, - /** - * This callback is iterating and {@link #process()} has been called - * but not returned yet. - */ - ITERATING, - /** - * While this callback was iterating, another request for iteration - * has been issued, so the iteration must continue even if a previous - * call to {@link #process()} returned {@link Action#IDLE}. - */ - ITERATE_AGAIN, - /** - * The overall job has succeeded. - */ - SUCCEEDED, - /** - * The overall job has failed. - */ - FAILED - } }