2 // ========================================================================
3 // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.io;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.WritePendingException;
25 import java.util.Arrays;
26 import java.util.EnumMap;
27 import java.util.EnumSet;
29 import java.util.concurrent.atomic.AtomicReference;
31 import org.eclipse.jetty.util.BufferUtil;
32 import org.eclipse.jetty.util.Callback;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
38 * A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling
39 * {@link EndPoint#flush(ByteBuffer...)} until all content is written.
40 * The abstract method {@link #onIncompleteFlushed()} is called when not all content has been written after a call to
41 * flush and should organise for the {@link #completeWrite()} method to be called when a subsequent call to flush
42 * should be able to make more progress.
45 abstract public class WriteFlusher
47 private static final Logger LOG = Log.getLogger(WriteFlusher.class);
48 private static final boolean DEBUG = LOG.isDebugEnabled(); // Easy for the compiler to remove the code if DEBUG==false
49 private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[]{BufferUtil.EMPTY_BUFFER};
50 private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
51 private static final State __IDLE = new IdleState();
52 private static final State __WRITING = new WritingState();
53 private static final State __COMPLETING = new CompletingState();
54 private final EndPoint _endPoint;
55 private final AtomicReference<State> _state = new AtomicReference<>();
59 // fill the state machine
60 __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
61 __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
62 __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
63 __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
64 __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
67 // A write operation may either complete immediately:
68 // IDLE-->WRITING-->IDLE
69 // Or it may not completely flush and go via the PENDING state
70 // IDLE-->WRITING-->PENDING-->COMPLETING-->IDLE
71 // Or it may take several cycles to complete
72 // IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE
74 // If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure.
75 // If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions,
76 // the callback's complete or respectively failed methods will be called.
77 // If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state
79 // IDLE--(fail)-->IDLE
80 // IDLE-->WRITING--(fail)-->FAILED-->IDLE
81 // IDLE-->WRITING-->PENDING--(fail)-->IDLE
82 // IDLE-->WRITING-->PENDING-->COMPLETING--(fail)-->FAILED-->IDLE
84 // So a call to fail in the PENDING state will be directly handled and the state changed to IDLE
85 // A call to fail in the WRITING or COMPLETING states will just set the state to FAILED and the failure will be
86 // handled with the write or completeWrite methods try to move the state from what they thought it was.
89 protected WriteFlusher(EndPoint endPoint)
95 private enum StateType
105 * Tries to update the current state to the given new state.
106 * @param previous the expected current state
107 * @param next the desired new state
108 * @return the previous state or null if the state transition failed
109 * @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error)
111 private boolean updateState(State previous,State next)
113 if (!isTransitionAllowed(previous,next))
114 throw new IllegalStateException();
116 boolean updated = _state.compareAndSet(previous, next);
118 LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
122 private void fail(PendingState pending)
124 State current = _state.get();
125 if (current.getType()==StateType.FAILED)
127 FailedState failed=(FailedState)current;
128 if (updateState(failed,__IDLE))
130 pending.fail(failed.getCause());
134 throw new IllegalStateException();
137 private void ignoreFail()
139 State current = _state.get();
140 while (current.getType()==StateType.FAILED)
142 if (updateState(current,__IDLE))
144 current = _state.get();
148 private boolean isTransitionAllowed(State currentState, State newState)
150 Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
151 if (!allowedNewStateTypes.contains(newState.getType()))
153 LOG.warn("{}: {} -> {} not allowed", this, currentState, newState);
160 * State represents a State of WriteFlusher.
162 private static class State
164 private final StateType _type;
166 private State(StateType stateType)
171 public StateType getType()
177 public String toString()
179 return String.format("%s", _type);
184 * In IdleState WriteFlusher is idle and accepts new writes
186 private static class IdleState extends State
190 super(StateType.IDLE);
195 * In WritingState WriteFlusher is currently writing.
197 private static class WritingState extends State
199 private WritingState()
201 super(StateType.WRITING);
206 * In FailedState no more operations are allowed. The current implementation will never recover from this state.
208 private static class FailedState extends State
210 private final Throwable _cause;
211 private FailedState(Throwable cause)
213 super(StateType.FAILED);
217 public Throwable getCause()
224 * In CompletingState WriteFlusher is flushing buffers that have not been fully written in write(). If write()
225 * didn't flush all buffers in one go, it'll switch the State to PendingState. completeWrite() will then switch to
226 * this state and try to flush the remaining buffers.
228 private static class CompletingState extends State
230 private CompletingState()
232 super(StateType.COMPLETING);
237 * In PendingState not all buffers could be written in one go. Then write() will switch to PendingState() and
238 * preserve the state by creating a new PendingState object with the given parameters.
240 private class PendingState extends State
242 private final Callback _callback;
243 private final ByteBuffer[] _buffers;
245 private PendingState(ByteBuffer[] buffers, Callback callback)
247 super(StateType.PENDING);
249 _callback = callback;
252 public ByteBuffer[] getBuffers()
257 protected boolean fail(Throwable cause)
261 _callback.failed(cause);
267 protected void complete()
270 _callback.succeeded();
275 * Abstract call to be implemented by specific WriteFlushers. It should schedule a call to {@link #completeWrite()}
276 * or {@link #onFail(Throwable)} when appropriate.
278 abstract protected void onIncompleteFlushed();
281 * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition
282 * fails it'll fail the callback.
284 * If not all buffers can be written in one go it creates a new <code>PendingState</code> object to preserve the state
285 * and then calls {@link #onIncompleteFlushed()}. The remaining buffers will be written in {@link #completeWrite()}.
287 * If all buffers have been written it calls callback.complete().
289 * @param callback the callback to call on either failed or complete
290 * @param buffers the buffers to flush to the endpoint
292 public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
295 LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
297 if (!updateState(__IDLE,__WRITING))
298 throw new WritePendingException();
302 buffers=flush(buffers);
304 // if we are incomplete?
308 LOG.debug("flushed incomplete");
309 PendingState pending=new PendingState(buffers, callback);
310 if (updateState(__WRITING,pending))
311 onIncompleteFlushed();
317 // If updateState didn't succeed, we don't care as our buffers have been written
318 if (!updateState(__WRITING,__IDLE))
321 callback.succeeded();
323 catch (IOException e)
326 LOG.debug("write exception", e);
327 if (updateState(__WRITING,__IDLE))
333 fail(new PendingState(buffers, callback));
339 * Complete a write that has not completed and that called {@link #onIncompleteFlushed()} to request a call to this
340 * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress.
342 * It tries to switch from PENDING to COMPLETING. If state transition fails, then it does nothing as the callback
343 * should have been already failed. That's because the only way to switch from PENDING outside this method is
344 * {@link #onFail(Throwable)} or {@link #onClose()}
346 public void completeWrite()
349 LOG.debug("completeWrite: {}", this);
351 State previous = _state.get();
353 if (previous.getType()!=StateType.PENDING)
354 return; // failure already handled.
356 PendingState pending = (PendingState)previous;
357 if (!updateState(pending,__COMPLETING))
358 return; // failure already handled.
362 ByteBuffer[] buffers = pending.getBuffers();
364 buffers=flush(buffers);
366 // if we are incomplete?
370 LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
371 if (buffers!=pending.getBuffers())
372 pending=new PendingState(buffers, pending._callback);
373 if (updateState(__COMPLETING,pending))
374 onIncompleteFlushed();
380 // If updateState didn't succeed, we don't care as our buffers have been written
381 if (!updateState(__COMPLETING,__IDLE))
385 catch (IOException e)
388 LOG.debug("completeWrite exception", e);
389 if(updateState(__COMPLETING,__IDLE))
396 /* ------------------------------------------------------------ */
397 /** Flush the buffers iteratively until no progress is made
398 * @param buffers The buffers to flush
399 * @return The unflushed buffers, or null if all flushed
400 * @throws IOException
402 protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
404 boolean progress=true;
405 while(progress && buffers!=null)
407 int before=buffers.length==0?0:buffers[0].remaining();
408 boolean flushed=_endPoint.flush(buffers);
409 int r=buffers.length==0?0:buffers[0].remaining();
419 if (++not_empty==buffers.length)
426 r=buffers[not_empty].remaining();
430 buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
433 // If buffers is null, then flush has returned false but has consumed all the data!
434 // This is probably SSL being unable to flush the encrypted buffer, so return EMPTY_BUFFERS
435 // and that will keep this WriteFlusher pending.
436 return buffers==null?EMPTY_BUFFERS:buffers;
439 /* ------------------------------------------------------------ */
440 /** Notify the flusher of a failure
441 * @param cause The cause of the failure
442 * @return true if the flusher passed the failure to a {@link Callback} instance
444 public boolean onFail(Throwable cause)
446 // Keep trying to handle the failure until we get to IDLE or FAILED state
449 State current=_state.get();
450 switch(current.getType())
455 LOG.debug("ignored: {} {}", this, cause);
460 LOG.debug("failed: {} {}", this, cause);
462 PendingState pending = (PendingState)current;
463 if (updateState(pending,__IDLE))
464 return pending.fail(cause);
469 LOG.debug("failed: {} {}", this, cause);
471 if (updateState(current,new FailedState(cause)))
478 public void onClose()
480 if (_state.get()==__IDLE)
482 onFail(new ClosedChannelException());
487 return _state.get().getType() == StateType.IDLE;
490 public boolean isInProgress()
492 switch(_state.get().getType())
504 public String toString()
506 return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());