//
// ========================================================================
-// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
{
private static final Logger LOG = Log.getLogger(WriteFlusher.class);
private static final boolean DEBUG = LOG.isDebugEnabled(); // Easy for the compiler to remove the code if DEBUG==false
- private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[0];
+ private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[]{BufferUtil.EMPTY_BUFFER};
private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
private static final State __IDLE = new IdleState();
private static final State __WRITING = new WritingState();
private PendingState(ByteBuffer[] buffers, Callback callback)
{
super(StateType.PENDING);
- _buffers = compact(buffers);
+ _buffers = buffers;
_callback = callback;
}
if (_callback!=null)
_callback.succeeded();
}
-
- /**
- * Compacting the buffers is needed because the semantic of WriteFlusher is
- * to write the buffers and if the caller sees that the buffer is consumed,
- * then it can recycle it.
- * If we do not compact, then it is possible that we store a consumed buffer,
- * which is then recycled and refilled; when the WriteFlusher is invoked to
- * complete the write, it will write the refilled bytes, garbling the content.
- *
- * @param buffers the buffers to compact
- * @return the compacted buffers
- */
- private ByteBuffer[] compact(ByteBuffer[] buffers)
- {
- int length = buffers.length;
-
- // Just one element, no need to compact
- if (length < 2)
- return buffers;
-
- // How many still have content ?
- int consumed = 0;
- while (consumed < length && BufferUtil.isEmpty(buffers[consumed]))
- ++consumed;
-
- // All of them still have content, no need to compact
- if (consumed == 0)
- return buffers;
-
- // None has content, return empty
- if (consumed == length)
- return EMPTY_BUFFERS;
-
- return Arrays.copyOfRange(buffers,consumed,length);
- }
}
/**
try
{
- boolean flushed=_endPoint.flush(buffers);
- if (DEBUG)
- LOG.debug("flushed {}", flushed);
+ buffers=flush(buffers);
- // Are we complete?
- for (ByteBuffer b : buffers)
+ // if we are incomplete?
+ if (buffers!=null)
{
- if (!flushed||BufferUtil.hasContent(b))
- {
- PendingState pending=new PendingState(buffers, callback);
- if (updateState(__WRITING,pending))
- onIncompleteFlushed();
- else
- fail(pending);
- return;
- }
+ if (DEBUG)
+ LOG.debug("flushed incomplete");
+ PendingState pending=new PendingState(buffers, callback);
+ if (updateState(__WRITING,pending))
+ onIncompleteFlushed();
+ else
+ fail(pending);
+ return;
}
// If updateState didn't succeed, we don't care as our buffers have been written
* {@link #onFail(Throwable)} or {@link #onClose()}
*/
public void completeWrite()
- {
+ {
if (DEBUG)
LOG.debug("completeWrite: {}", this);
{
ByteBuffer[] buffers = pending.getBuffers();
- boolean flushed=_endPoint.flush(buffers);
- if (DEBUG)
- LOG.debug("flushed {}", flushed);
+ buffers=flush(buffers);
- // Are we complete?
- for (ByteBuffer b : buffers)
+ // if we are incomplete?
+ if (buffers!=null)
{
- if (!flushed || BufferUtil.hasContent(b))
- {
- if (updateState(__COMPLETING,pending))
- onIncompleteFlushed();
- else
- fail(pending);
- return;
- }
+ if (DEBUG)
+ LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
+ if (buffers!=pending.getBuffers())
+ pending=new PendingState(buffers, pending._callback);
+ if (updateState(__COMPLETING,pending))
+ onIncompleteFlushed();
+ else
+ fail(pending);
+ return;
}
// If updateState didn't succeed, we don't care as our buffers have been written
}
}
+ /* ------------------------------------------------------------ */
+ /** Flush the buffers iteratively until no progress is made
+ * @param buffers The buffers to flush
+ * @return The unflushed buffers, or null if all flushed
+ * @throws IOException
+ */
+ protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
+ {
+ boolean progress=true;
+ while(progress && buffers!=null)
+ {
+ int before=buffers.length==0?0:buffers[0].remaining();
+ boolean flushed=_endPoint.flush(buffers);
+ int r=buffers.length==0?0:buffers[0].remaining();
+
+ if (flushed)
+ return null;
+
+ progress=before!=r;
+
+ int not_empty=0;
+ while(r==0)
+ {
+ if (++not_empty==buffers.length)
+ {
+ buffers=null;
+ not_empty=0;
+ break;
+ }
+ progress=true;
+ r=buffers[not_empty].remaining();
+ }
+
+ if (not_empty>0)
+ buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
+ }
+
+ // If buffers is null, then flush has returned false but has consumed all the data!
+ // This is probably SSL being unable to flush the encrypted buffer, so return EMPTY_BUFFERS
+ // and that will keep this WriteFlusher pending.
+ return buffers==null?EMPTY_BUFFERS:buffers;
+ }
+
/* ------------------------------------------------------------ */
/** Notify the flusher of a failure
* @param cause The cause of the failure