]> WPIA git - gigi.git/blobdiff - lib/jetty/org/eclipse/jetty/io/WriteFlusher.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / io / WriteFlusher.java
diff --git a/lib/jetty/org/eclipse/jetty/io/WriteFlusher.java b/lib/jetty/org/eclipse/jetty/io/WriteFlusher.java
new file mode 100644 (file)
index 0000000..fccc622
--- /dev/null
@@ -0,0 +1,504 @@
+//
+//  ========================================================================
+//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  ------------------------------------------------------------------------
+//  All rights reserved. This program and the accompanying materials
+//  are made available under the terms of the Eclipse Public License v1.0
+//  and Apache License v2.0 which accompanies this distribution.
+//
+//      The Eclipse Public License is available at
+//      http://www.eclipse.org/legal/epl-v10.html
+//
+//      The Apache License v2.0 is available at
+//      http://www.opensource.org/licenses/apache2.0.php
+//
+//  You may elect to redistribute this code under either of these licenses.
+//  ========================================================================
+//
+
+package org.eclipse.jetty.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritePendingException;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+
+
+/**
+ * A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling
+ * {@link EndPoint#flush(ByteBuffer...)} until all content is written.
+ * The abstract method {@link #onIncompleteFlushed()} is called when not all content has been written after a call to
+ * flush and should organise for the {@link #completeWrite()} method to be called when a subsequent call to flush
+ * should  be able to make more progress.
+ * <p>
+ */
+abstract public class WriteFlusher
+{
+    private static final Logger LOG = Log.getLogger(WriteFlusher.class);
+    private static final boolean DEBUG = LOG.isDebugEnabled(); // Easy for the compiler to remove the code if DEBUG==false
+    private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[0];
+    private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
+    private static final State __IDLE = new IdleState();
+    private static final State __WRITING = new WritingState();
+    private static final State __COMPLETING = new CompletingState();
+    private final EndPoint _endPoint;
+    private final AtomicReference<State> _state = new AtomicReference<>();
+
+    static
+    {
+        // fill the state machine
+        __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
+        __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
+        __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
+        __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
+        __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
+    }
+
+    // A write operation may either complete immediately:
+    //     IDLE-->WRITING-->IDLE
+    // Or it may not completely flush and go via the PENDING state
+    //     IDLE-->WRITING-->PENDING-->COMPLETING-->IDLE
+    // Or it may take several cycles to complete
+    //     IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE
+    //
+    // If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure.
+    // If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions,
+    // the callback's complete or respectively failed methods will be called.
+    // If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state
+    //
+    //   IDLE--(fail)-->IDLE
+    //   IDLE-->WRITING--(fail)-->FAILED-->IDLE
+    //   IDLE-->WRITING-->PENDING--(fail)-->IDLE
+    //   IDLE-->WRITING-->PENDING-->COMPLETING--(fail)-->FAILED-->IDLE
+    //
+    // So a call to fail in the PENDING state will be directly handled and the state changed to IDLE
+    // A call to fail in the WRITING or COMPLETING states will just set the state to FAILED and the failure will be
+    // handled with the write or completeWrite methods try to move the state from what they thought it was.
+    //
+
+    protected WriteFlusher(EndPoint endPoint)
+    {
+        _state.set(__IDLE);
+        _endPoint = endPoint;
+    }
+
+    private enum StateType
+    {
+        IDLE,
+        WRITING,
+        PENDING,
+        COMPLETING,
+        FAILED
+    }
+
+    /**
+     * Tries to update the current state to the given new state.
+     * @param previous the expected current state
+     * @param next the desired new state
+     * @return the previous state or null if the state transition failed
+     * @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error)
+     */
+    private boolean updateState(State previous,State next)
+    {
+        if (!isTransitionAllowed(previous,next))
+            throw new IllegalStateException();
+
+        boolean updated = _state.compareAndSet(previous, next);
+        if (DEBUG)
+            LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
+        return updated;
+    }
+
+    private void fail(PendingState pending)
+    {
+        State current = _state.get();
+        if (current.getType()==StateType.FAILED)
+        {
+            FailedState failed=(FailedState)current;
+            if (updateState(failed,__IDLE))
+            {
+                pending.fail(failed.getCause());
+                return;
+            }
+        }
+        throw new IllegalStateException();
+    }
+
+    private void ignoreFail()
+    {
+        State current = _state.get();
+        while (current.getType()==StateType.FAILED)
+        {
+            if (updateState(current,__IDLE))
+                return;
+            current = _state.get();
+        }
+    }
+
+    private boolean isTransitionAllowed(State currentState, State newState)
+    {
+        Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
+        if (!allowedNewStateTypes.contains(newState.getType()))
+        {
+            LOG.warn("{}: {} -> {} not allowed", this, currentState, newState);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * State represents a State of WriteFlusher.
+     */
+    private static class State
+    {
+        private final StateType _type;
+
+        private State(StateType stateType)
+        {
+            _type = stateType;
+        }
+
+        public StateType getType()
+        {
+            return _type;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("%s", _type);
+        }
+    }
+
+    /**
+     * In IdleState WriteFlusher is idle and accepts new writes
+     */
+    private static class IdleState extends State
+    {
+        private IdleState()
+        {
+            super(StateType.IDLE);
+        }
+    }
+
+    /**
+     * In WritingState WriteFlusher is currently writing.
+     */
+    private static class WritingState extends State
+    {
+        private WritingState()
+        {
+            super(StateType.WRITING);
+        }
+    }
+
+    /**
+     * In FailedState no more operations are allowed. The current implementation will never recover from this state.
+     */
+    private static class FailedState extends State
+    {
+        private final Throwable _cause;
+        private FailedState(Throwable cause)
+        {
+            super(StateType.FAILED);
+            _cause=cause;
+        }
+
+        public Throwable getCause()
+        {
+            return _cause;
+        }
+    }
+
+    /**
+     * In CompletingState WriteFlusher is flushing buffers that have not been fully written in write(). If write()
+     * didn't flush all buffers in one go, it'll switch the State to PendingState. completeWrite() will then switch to
+     * this state and try to flush the remaining buffers.
+     */
+    private static class CompletingState extends State
+    {
+        private CompletingState()
+        {
+            super(StateType.COMPLETING);
+        }
+    }
+
+    /**
+     * In PendingState not all buffers could be written in one go. Then write() will switch to PendingState() and
+     * preserve the state by creating a new PendingState object with the given parameters.
+     */
+    private class PendingState extends State
+    {
+        private final Callback _callback;
+        private final ByteBuffer[] _buffers;
+
+        private PendingState(ByteBuffer[] buffers, Callback callback)
+        {
+            super(StateType.PENDING);
+            _buffers = compact(buffers);
+            _callback = callback;
+        }
+
+        public ByteBuffer[] getBuffers()
+        {
+            return _buffers;
+        }
+
+        protected boolean fail(Throwable cause)
+        {
+            if (_callback!=null)
+            {
+                _callback.failed(cause);
+                return true;
+            }
+            return false;
+        }
+
+        protected void complete()
+        {
+            if (_callback!=null)
+                _callback.succeeded();
+        }
+
+        /**
+         * Compacting the buffers is needed because the semantic of WriteFlusher is
+         * to write the buffers and if the caller sees that the buffer is consumed,
+         * then it can recycle it.
+         * If we do not compact, then it is possible that we store a consumed buffer,
+         * which is then recycled and refilled; when the WriteFlusher is invoked to
+         * complete the write, it will write the refilled bytes, garbling the content.
+         *
+         * @param buffers the buffers to compact
+         * @return the compacted buffers
+         */
+        private ByteBuffer[] compact(ByteBuffer[] buffers)
+        {
+            int length = buffers.length;
+
+            // Just one element, no need to compact
+            if (length < 2)
+                return buffers;
+
+            // How many still have content ?
+            int consumed = 0;
+            while (consumed < length && BufferUtil.isEmpty(buffers[consumed]))
+                ++consumed;
+
+            // All of them still have content, no need to compact
+            if (consumed == 0)
+                return buffers;
+
+            // None has content, return empty
+            if (consumed == length)
+                return EMPTY_BUFFERS;
+
+            return Arrays.copyOfRange(buffers,consumed,length);
+        }
+    }
+
+    /**
+     * Abstract call to be implemented by specific WriteFlushers. It should schedule a call to {@link #completeWrite()}
+     * or {@link #onFail(Throwable)} when appropriate.
+     */
+    abstract protected void onIncompleteFlushed();
+
+    /**
+     * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition
+     * fails it'll fail the callback.
+     *
+     * If not all buffers can be written in one go it creates a new <code>PendingState</code> object to preserve the state
+     * and then calls {@link #onIncompleteFlushed()}. The remaining buffers will be written in {@link #completeWrite()}.
+     *
+     * If all buffers have been written it calls callback.complete().
+     *
+     * @param callback the callback to call on either failed or complete
+     * @param buffers the buffers to flush to the endpoint
+     */
+    public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
+    {
+        if (DEBUG)
+            LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
+
+        if (!updateState(__IDLE,__WRITING))
+            throw new WritePendingException();
+
+        try
+        {
+            boolean flushed=_endPoint.flush(buffers);
+            if (DEBUG)
+                LOG.debug("flushed {}", flushed);
+
+            // Are we complete?
+            for (ByteBuffer b : buffers)
+            {
+                if (!flushed||BufferUtil.hasContent(b))
+                {
+                    PendingState pending=new PendingState(buffers, callback);
+                    if (updateState(__WRITING,pending))
+                        onIncompleteFlushed();
+                    else
+                        fail(pending);
+                    return;
+                }
+            }
+
+            // If updateState didn't succeed, we don't care as our buffers have been written
+            if (!updateState(__WRITING,__IDLE))
+                ignoreFail();
+            if (callback!=null)
+                callback.succeeded();
+        }
+        catch (IOException e)
+        {
+            if (DEBUG)
+                LOG.debug("write exception", e);
+            if (updateState(__WRITING,__IDLE))
+            {
+                if (callback!=null)
+                    callback.failed(e);
+            }
+            else
+                fail(new PendingState(buffers, callback));
+        }
+    }
+
+
+    /**
+     * Complete a write that has not completed and that called {@link #onIncompleteFlushed()} to request a call to this
+     * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress.
+     *
+     * It tries to switch from PENDING to COMPLETING. If state transition fails, then it does nothing as the callback
+     * should have been already failed. That's because the only way to switch from PENDING outside this method is
+     * {@link #onFail(Throwable)} or {@link #onClose()}
+     */
+    public void completeWrite()
+    {
+        if (DEBUG)
+            LOG.debug("completeWrite: {}", this);
+
+        State previous = _state.get();
+
+        if (previous.getType()!=StateType.PENDING)
+            return; // failure already handled.
+
+        PendingState pending = (PendingState)previous;
+        if (!updateState(pending,__COMPLETING))
+            return; // failure already handled.
+
+        try
+        {
+            ByteBuffer[] buffers = pending.getBuffers();
+
+            boolean flushed=_endPoint.flush(buffers);
+            if (DEBUG)
+                LOG.debug("flushed {}", flushed);
+
+            // Are we complete?
+            for (ByteBuffer b : buffers)
+            {
+                if (!flushed || BufferUtil.hasContent(b))
+                {
+                    if (updateState(__COMPLETING,pending))
+                        onIncompleteFlushed();
+                    else
+                        fail(pending);
+                    return;
+                }
+            }
+
+            // If updateState didn't succeed, we don't care as our buffers have been written
+            if (!updateState(__COMPLETING,__IDLE))
+                ignoreFail();
+            pending.complete();
+        }
+        catch (IOException e)
+        {
+            if (DEBUG)
+                LOG.debug("completeWrite exception", e);
+            if(updateState(__COMPLETING,__IDLE))
+                pending.fail(e);
+            else
+                fail(pending);
+        }
+    }
+
+    /* ------------------------------------------------------------ */
+    /** Notify the flusher of a failure
+     * @param cause The cause of the failure
+     * @return true if the flusher passed the failure to a {@link Callback} instance
+     */
+    public boolean onFail(Throwable cause)
+    {
+        // Keep trying to handle the failure until we get to IDLE or FAILED state
+        while(true)
+        {
+            State current=_state.get();
+            switch(current.getType())
+            {
+                case IDLE:
+                case FAILED:
+                    if (DEBUG)
+                        LOG.debug("ignored: {} {}", this, cause);
+                    return false;
+
+                case PENDING:
+                    if (DEBUG)
+                        LOG.debug("failed: {} {}", this, cause);
+
+                    PendingState pending = (PendingState)current;
+                    if (updateState(pending,__IDLE))
+                        return pending.fail(cause);
+                    break;
+
+                default:
+                    if (DEBUG)
+                        LOG.debug("failed: {} {}", this, cause);
+
+                    if (updateState(current,new FailedState(cause)))
+                        return false;
+                    break;
+            }
+        }
+    }
+
+    public void onClose()
+    {
+        if (_state.get()==__IDLE)
+            return;
+        onFail(new ClosedChannelException());
+    }
+
+    boolean isIdle()
+    {
+        return _state.get().getType() == StateType.IDLE;
+    }
+
+    public boolean isInProgress()
+    {
+        switch(_state.get().getType())
+        {
+            case WRITING:
+            case PENDING:
+            case COMPLETING:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
+    }
+}