]> WPIA git - gigi.git/blobdiff - lib/jetty/org/eclipse/jetty/util/IteratingCallback.java
updating jetty to jetty-9.2.16.v2016040
[gigi.git] / lib / jetty / org / eclipse / jetty / util / IteratingCallback.java
index 1c74d5eaf6535e9d15503686c63ce07cc71dc126..8d24dd9d11cd498f192454f7bf97fc1cf58fec64 100644 (file)
@@ -1,6 +1,6 @@
 //
 //  ========================================================================
-//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
 //  ------------------------------------------------------------------------
 //  All rights reserved. This program and the accompanying materials
 //  are made available under the terms of the Eclipse Public License v1.0
@@ -18,6 +18,7 @@
 
 package org.eclipse.jetty.util;
 
+import java.nio.channels.ClosedChannelException;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -50,6 +51,55 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public abstract class IteratingCallback implements Callback
 {
+    /**
+     * The internal states of this callback
+     */
+    private enum State
+    {
+        /**
+         * This callback is IDLE, ready to iterate.
+         */
+        IDLE,
+
+        /**
+         * This callback is iterating calls to {@link #process()} and is dealing with
+         * the returns.  To get into processing state, it much of held the lock state
+         * and set iterating to true.
+         */
+        PROCESSING,
+        
+        /**
+         * Waiting for a schedule callback
+         */
+        PENDING,
+        
+        /**
+         * Called by a schedule callback
+         */
+        CALLED,
+        
+        /**
+         * The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return 
+         * from {@link IteratingCallback#process()}
+         */
+        SUCCEEDED,
+        
+        /**
+         * The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Throwable)}
+         */
+        FAILED,
+        
+        /**
+         * This callback has been closed and cannot be reset.
+         */ 
+        CLOSED,
+        
+        /**
+         * State is locked while leaving processing state to check the iterate boolean
+         */
+        LOCKED
+    }
+
     /**
      * The indication of the overall progress of the overall job that
      * implementations of {@link #process()} must return.
@@ -68,18 +118,27 @@ public abstract class IteratingCallback implements Callback
          * may have not yet been invoked.
          */
         SCHEDULED,
+        
         /**
          * Indicates that {@link #process()} has completed the overall job.
          */
-        SUCCEEDED,
-        /**
-         * Indicates that {@link #process()} has failed the overall job.
-         */
-        FAILED
+        SUCCEEDED
     }
 
-    private final AtomicReference<State> _state = new AtomicReference<>(State.INACTIVE);
-
+    private final AtomicReference<State> _state;
+    private boolean _iterate;
+    
+    
+    protected IteratingCallback()
+    {
+        _state = new AtomicReference<>(State.IDLE);
+    }
+    
+    protected IteratingCallback(boolean needReset)
+    {
+        _state = new AtomicReference<>(needReset ? State.SUCCEEDED : State.IDLE);
+    }
+    
     /**
      * Method called by {@link #iterate()} to process the sub task.
      * <p/>
@@ -91,129 +150,197 @@ public abstract class IteratingCallback implements Callback
      * <li>{@link Action#SCHEDULED} when the sub task asynchronous execution
      * has been started</li>
      * <li>{@link Action#SUCCEEDED} when the overall job is completed</li>
-     * <li>{@link Action#FAILED} when the overall job cannot be completed</li>
      * </ul>
      *
      * @throws Exception if the sub task processing throws
      */
     protected abstract Action process() throws Exception;
 
+    /**
+     * @deprecated Use {@link #onCompleteSuccess()} instead.
+     */
+    @Deprecated
+    protected void completed()
+    {
+    }
+
     /**
      * Invoked when the overall task has completed successfully.
+     *
+     * @see #onCompleteFailure(Throwable)
      */
-    protected abstract void completed();
+    protected void onCompleteSuccess()
+    {
+        completed();
+    }
+    
+    /**
+     * Invoked when the overall task has completed with a failure.
+     *
+     * @see #onCompleteSuccess()
+     */
+    protected void onCompleteFailure(Throwable x)
+    {
+    }
 
     /**
      * This method must be invoked by applications to start the processing
-     * of sub tasks.
-     * <p/>
-     * If {@link #process()} returns {@link Action#IDLE}, then this method
-     * should be called again to restart processing.
-     * It is safe to call iterate multiple times from multiple threads since only
-     * the first thread to move the state out of INACTIVE will actually do any iteration
-     * and processing.
+     * of sub tasks.  It can be called at any time by any thread, and it's 
+     * contract is that when called, then the {@link #process()} method will
+     * be called during or soon after, either by the calling thread or by 
+     * another thread.
      */
     public void iterate()
     {
-        try
+        loop: while (true)
         {
-            while (true)
+            State state=_state.get();
+            switch (state)
             {
-                switch (_state.get())
-                {
-                    case INACTIVE:
-                    {
-                        if (processIterations())
-                            return;
-                        break;
-                    }
-                    case ITERATING:
-                    {
-                        if (_state.compareAndSet(State.ITERATING, State.ITERATE_AGAIN))
-                            return;
-                        break;
-                    }
-                    default:
-                    {
-                        return;
-                    }
-                }
+                case PENDING:
+                case CALLED:
+                    // process will be called when callback is handled
+                    break loop;
+                    
+                case IDLE:
+                    if (!_state.compareAndSet(state,State.PROCESSING))
+                        continue;
+                    processing();
+                    break loop;
+                    
+                case PROCESSING:
+                    if (!_state.compareAndSet(state,State.LOCKED))
+                        continue;
+                    // Tell the thread that is processing that it must iterate again
+                    _iterate=true;
+                    _state.set(State.PROCESSING);
+                    break loop;
+                    
+                case LOCKED:
+                    Thread.yield();
+                    continue loop;
+
+                case FAILED:
+                case SUCCEEDED:
+                    break loop;
+
+                case CLOSED:
+                default:
+                    throw new IllegalStateException("state="+state);
             }
         }
-        catch (Throwable x)
-        {
-            failed(x);
-        }
     }
 
-    private boolean processIterations() throws Exception
+    private void processing() 
     {
-        // Keeps iterating as long as succeeded() is called during process().
-        // If we are in INACTIVE state, either this is the first iteration or
-        // succeeded()/failed() were called already.
-        while (_state.compareAndSet(State.INACTIVE, State.ITERATING))
+        // This should only ever be called when in processing state, however a failed or close call
+        // may happen concurrently, so state is not assumed.
+        
+        // While we are processing
+        processing: while (true)
         {
-            // Method process() can only be called by one thread at a time because
-            // it is guarded by the CaS above. However, the case blocks below may
-            // be executed concurrently in this case: T1 calls process() which
-            // executes the asynchronous sub task, which calls succeeded(), which
-            // moves the state into INACTIVE, then returns SCHEDULED; T2 calls
-            // iterate(), state is now INACTIVE and process() is called again and
-            // returns another action. Now we have 2 threads that may execute the
-            // action case blocks below concurrently; therefore each case block
-            // has to be prepared to fail the CaS it's doing.
+            // Call process to get the action that we have to take.
+            Action action;
+            try
+            {
+                action = process();
+            }
+            catch (Throwable x)
+            {
+                failed(x);
+                break processing;
+            }
 
-            Action action = process();
-            switch (action)
+            // loop until we have successfully acted on the action we have just received
+            acting: while(true)
             {
-                case IDLE:
+                // action handling needs to know the state
+                State state=_state.get();
+                
+                switch (state)
                 {
-                    // No more progress can be made.
-                    if (_state.compareAndSet(State.ITERATING, State.INACTIVE))
-                        return true;
+                    case PROCESSING:
+                    {
+                        switch (action)
+                        {
+                            case IDLE:
+                            {
+                                // lock the state
+                                if (!_state.compareAndSet(state,State.LOCKED))
+                                    continue acting;
 
-                    // Was iterate() called again since we already decided to go INACTIVE ?
-                    // If so, try another iteration as more work may have been added
-                    // while the previous call to process() was returning.
-                    if (_state.compareAndSet(State.ITERATE_AGAIN, State.INACTIVE))
-                        continue;
+                                // Has iterate been called while we were processing?
+                                if (_iterate)
+                                {
+                                    // yes, so skip idle and keep processing
+                                    _iterate=false;
+                                    _state.set(State.PROCESSING);
+                                    continue processing;
+                                }
 
-                    // State may have changed concurrently, try again.
-                    continue;
-                }
-                case SCHEDULED:
-                {
-                    // The sub task is executing, and the callback for it may or
-                    // may not have already been called yet, which we figure out below.
-                    // Can double CaS here because state never changes directly ITERATING_AGAIN --> ITERATE.
-                    if (_state.compareAndSet(State.ITERATING, State.ACTIVE) ||
-                            _state.compareAndSet(State.ITERATE_AGAIN, State.ACTIVE))
-                        // Not called back yet, so wait.
-                        return true;
-                    // Call back must have happened, so iterate.
-                    continue;
-                }
-                case SUCCEEDED:
-                {
-                    // The overall job has completed.
-                    if (completeSuccess())
-                        completed();
-                    return true;
-                }
-                case FAILED:
-                {
-                    completeFailure();
-                    return true;
-                }
-                default:
-                {
-                    throw new IllegalStateException(toString());
+                                // No, so we can go idle
+                                _state.set(State.IDLE);
+                                break processing;
+                            }
+                            
+                            case SCHEDULED:
+                            {
+                                if (!_state.compareAndSet(state, State.PENDING))
+                                    continue acting;
+                                // we won the race against the callback, so the callback has to process and we can break processing
+                                break processing;
+                            }
+                            
+                            case SUCCEEDED:
+                            {
+                                if (!_state.compareAndSet(state, State.LOCKED))
+                                    continue acting;
+                                _iterate=false;
+                                _state.set(State.SUCCEEDED);
+                                onCompleteSuccess();
+                                break processing;
+                            }
+
+                            default:
+                                throw new IllegalStateException("state="+state+" action="+action); 
+                        }
+                    }
+                    
+                    case CALLED:
+                    {
+                        switch (action)
+                        {
+                            case SCHEDULED:
+                            {
+                                if (!_state.compareAndSet(state, State.PROCESSING))
+                                    continue acting;
+                                // we lost the race, so we have to keep processing
+                                continue processing;
+                            }
+
+                            default:
+                                throw new IllegalStateException("state="+state+" action="+action); 
+                        }
+                    }
+                        
+                    case LOCKED:
+                        Thread.yield();
+                        continue acting;
+
+                    case SUCCEEDED:
+                    case FAILED:
+                    case CLOSED:
+                        break processing;
+
+                    case IDLE:
+                    case PENDING:
+                    default:
+                        throw new IllegalStateException("state="+state+" action="+action); 
                 }
             }
         }
-        return false;
     }
-
+    
     /**
      * Invoked when the sub task succeeds.
      * Subclasses that override this method must always remember to call
@@ -222,38 +349,38 @@ public abstract class IteratingCallback implements Callback
     @Override
     public void succeeded()
     {
-        while (true)
+        loop: while (true)
         {
-            State current = _state.get();
-            switch (current)
+            State state = _state.get();
+            switch (state)
             {
-                case ITERATE_AGAIN:
-                case ITERATING:
+                case PROCESSING:
                 {
-                    if (_state.compareAndSet(current, State.INACTIVE))
-                        return;
-                    continue;
+                    if (!_state.compareAndSet(state, State.CALLED))
+                        continue loop;
+                    break loop;
                 }
-                case ACTIVE:
+                case PENDING:
                 {
-                    // If we can move from ACTIVE to INACTIVE
-                    // then we are responsible to call iterate().
-                    if (_state.compareAndSet(current, State.INACTIVE))
-                        iterate();
-                    // If we can't CaS, then failed() must have been
-                    // called, and we just return.
-                    return;
+                    if (!_state.compareAndSet(state, State.PROCESSING))
+                        continue loop;
+                    processing();
+                    break loop;
                 }
-                case INACTIVE:
+                case CLOSED:
+                case FAILED:
                 {
-                    // Support the case where the callback is scheduled
-                    // externally without a call to iterate().
-                    iterate();
-                    return;
+                    // Too late!
+                    break loop;
                 }
+                case LOCKED:
+                {
+                    Thread.yield();
+                    continue loop;
+                }       
                 default:
                 {
-                    throw new IllegalStateException(toString());
+                    throw new IllegalStateException("state="+state);
                 }
             }
         }
@@ -267,53 +394,89 @@ public abstract class IteratingCallback implements Callback
     @Override
     public void failed(Throwable x)
     {
-        completeFailure();
-    }
-
-    private boolean completeSuccess()
-    {
-        while (true)
+        loop: while (true)
         {
-            State current = _state.get();
-            if (current == State.FAILED)
+            State state = _state.get();
+            switch (state)
             {
-                // Success arrived too late, sorry.
-                return false;
-            }
-            else
-            {
-                if (_state.compareAndSet(current, State.SUCCEEDED))
-                    return true;
+                case SUCCEEDED:
+                case FAILED:
+                case IDLE:
+                case CLOSED:
+                case CALLED:
+                {
+                    // too late!.
+                    break loop;
+                }
+                case LOCKED:
+                {
+                    Thread.yield();
+                    continue loop;
+                }  
+                case PENDING: 
+                case PROCESSING: 
+                {
+                    if (!_state.compareAndSet(state, State.FAILED))
+                        continue loop;
+
+                    onCompleteFailure(x);
+                    break loop;
+                }
+                default:
+                    throw new IllegalStateException("state="+state);
             }
         }
     }
 
-    private void completeFailure()
+    public void close()
     {
-        while (true)
+        loop: while (true)
         {
-            State current = _state.get();
-            if (current == State.SUCCEEDED)
+            State state = _state.get();
+            switch (state)
             {
-                // Failed arrived too late, sorry.
-                return;
-            }
-            else
-            {
-                if (_state.compareAndSet(current, State.FAILED))
-                    break;
+                case IDLE:
+                case SUCCEEDED:
+                case FAILED:
+                {
+                    if (!_state.compareAndSet(state, State.CLOSED))
+                        continue loop;
+                    break loop;
+                }
+                case CLOSED:
+                {
+                    break loop;
+                }
+                case LOCKED:
+                {
+                    Thread.yield();
+                    continue loop;
+                }    
+                default:
+                {
+                    if (!_state.compareAndSet(state, State.CLOSED))
+                        continue loop;
+                    onCompleteFailure(new ClosedChannelException());
+                    break loop;
+                }
             }
         }
     }
 
-    /**
+    /*
+     * only for testing
      * @return whether this callback is idle and {@link #iterate()} needs to be called
      */
-    public boolean isIdle()
+    boolean isIdle()
     {
-        return _state.get() == State.INACTIVE;
+        return _state.get() == State.IDLE;
     }
 
+    public boolean isClosed()
+    {
+        return _state.get() == State.CLOSED;
+    }
+    
     /**
      * @return whether this callback has failed
      */
@@ -330,45 +493,51 @@ public abstract class IteratingCallback implements Callback
         return _state.get() == State.SUCCEEDED;
     }
 
+    /**
+     * Resets this callback.
+     * <p/>
+     * A callback can only be reset to IDLE from the
+     * SUCCEEDED or FAILED states or if it is already IDLE.
+     *
+     * @return true if the reset was successful
+     */
+    public boolean reset()
+    {
+        while (true)
+        {
+            State state=_state.get();
+            switch(state)
+            {
+                case IDLE:
+                    return true;
+                    
+                case SUCCEEDED:
+                    if (!_state.compareAndSet(state, State.LOCKED))
+                        continue;
+                    _iterate=false;
+                    _state.set(State.IDLE);
+                    return true;
+                    
+                case FAILED:
+                    if (!_state.compareAndSet(state, State.LOCKED))
+                        continue;
+                    _iterate=false;
+                    _state.set(State.IDLE);
+                    return true;
+
+                case LOCKED:
+                    Thread.yield();
+                    continue;
+                    
+                default:
+                    return false;
+            }
+        }
+    }
+    
     @Override
     public String toString()
     {
         return String.format("%s[%s]", super.toString(), _state);
     }
-
-    /**
-     * The internal states of this callback
-     */
-    private enum State
-    {
-        /**
-         * This callback is inactive, ready to iterate.
-         */
-        INACTIVE,
-        /**
-         * This callback is iterating and {@link #process()} has scheduled an
-         * asynchronous operation by returning {@link Action#SCHEDULED}, but
-         * the operation is still undergoing.
-         */
-        ACTIVE,
-        /**
-         * This callback is iterating and {@link #process()} has been called
-         * but not returned yet.
-         */
-        ITERATING,
-        /**
-         * While this callback was iterating, another request for iteration
-         * has been issued, so the iteration must continue even if a previous
-         * call to {@link #process()} returned {@link Action#IDLE}.
-         */
-        ITERATE_AGAIN,
-        /**
-         * The overall job has succeeded.
-         */
-        SUCCEEDED,
-        /**
-         * The overall job has failed.
-         */
-        FAILED
-    }
 }