//
// ========================================================================
-// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
{
private static Logger LOG = Log.getLogger(HttpOutput.class);
private final HttpChannel<?> _channel;
- private final SharedBlockingCallback _writeblock=new SharedBlockingCallback();
+ private final SharedBlockingCallback _writeblock=new SharedBlockingCallback()
+ {
+ @Override
+ protected long getIdleTimeout()
+ {
+ return _channel.getIdleTimeout();
+ }
+ };
private long _written;
private ByteBuffer _aggregate;
private int _bufferSize;
public HttpOutput(HttpChannel<?> channel)
{
_channel = channel;
- _bufferSize = _channel.getHttpConfiguration().getOutputBufferSize();
- _commitSize=_bufferSize/4;
+ HttpConfiguration config = channel.getHttpConfiguration();
+ _bufferSize = config.getOutputBufferSize();
+ _commitSize = config.getOutputAggregationSize();
+ if (_commitSize>_bufferSize)
+ {
+ LOG.warn("OutputAggregationSize {} exceeds bufferSize {}",_commitSize,_bufferSize);
+ _commitSize=_bufferSize;
+ }
}
public HttpChannel<?> getHttpChannel()
catch(IOException e)
{
LOG.debug(e);
- _channel.failed();
+ _channel.abort();
}
releaseBuffer();
return;
catch(IOException e)
{
LOG.debug(e);
- _channel.failed();
+ _channel.abort();
}
releaseBuffer();
return;
write(_aggregate, complete && len==0);
// should we fill aggregate again from the buffer?
- if (len>0 && !complete && len<=_commitSize)
+ if (len>0 && !complete && len<=_commitSize && len<=BufferUtil.space(_aggregate))
{
BufferUtil.append(_aggregate, b, off, len);
return;
* @param httpContent The content to send
* @param callback The callback to use to notify success or failure
*/
- public void sendContent(HttpContent httpContent, Callback callback) throws IOException
+ public void sendContent(HttpContent httpContent, Callback callback)
{
if (BufferUtil.hasContent(_aggregate))
- throw new IOException("written");
+ {
+ callback.failed(new IOException("cannot sendContent() after write()"));
+ return;
+ }
if (_channel.isCommitted())
- throw new IOException("committed");
+ {
+ callback.failed(new IOException("committed"));
+ return;
+ }
while (true)
{
continue;
break;
case ERROR:
- throw new EofException(_onError);
+ callback.failed(new EofException(_onError));
+ return;
+
case CLOSED:
- throw new EofException("Closed");
+ callback.failed(new EofException("Closed"));
+ return;
default:
throw new IllegalStateException();
}
if (buffer!=null)
{
+ if (LOG.isDebugEnabled())
+ LOG.debug("sendContent({}=={},{},direct={})",httpContent,BufferUtil.toDetailString(buffer),callback,_channel.useDirectBuffers());
+
sendContent(buffer,callback);
return;
}
- ReadableByteChannel rbc=httpContent.getReadableByteChannel();
- if (rbc!=null)
+ try
{
- // Close of the rbc is done by the async sendContent
- sendContent(rbc,callback);
- return;
- }
+ ReadableByteChannel rbc=httpContent.getReadableByteChannel();
+ if (rbc!=null)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("sendContent({}=={},{},direct={})",httpContent,rbc,callback,_channel.useDirectBuffers());
+ // Close of the rbc is done by the async sendContent
+ sendContent(rbc,callback);
+ return;
+ }
- InputStream in = httpContent.getInputStream();
- if ( in!=null )
+ InputStream in = httpContent.getInputStream();
+ if ( in!=null )
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("sendContent({}=={},{},direct={})",httpContent,in,callback,_channel.useDirectBuffers());
+ sendContent(in,callback);
+ return;
+ }
+ }
+ catch(Throwable th)
{
- sendContent(in,callback);
+ callback.failed(th);
return;
}
{
Throwable th=_onError;
_onError=null;
- LOG.debug("onError",th);
+ if (LOG.isDebugEnabled())
+ LOG.debug("onError",th);
_writeListener.onError(th);
close();
}
}
- continue loop;
+ continue;
}
-
+
switch(_state.get())
{
- case READY:
case CLOSED:
// even though a write is not possible, because a close has
// occurred, we need to call onWritePossible to tell async
// producer that the last write completed.
+ // so fall through
+ case READY:
try
{
_writeListener.onWritePossible();
_onError=e;
}
break;
+
default:
-
+ _onError=new IllegalStateException("state="+_state.get());
}
}
}
private abstract class AsyncICB extends IteratingCallback
{
@Override
- protected void completed()
+ protected void onCompleteSuccess()
{
while(true)
{
}
@Override
- public void failed(Throwable e)
+ public void onCompleteFailure(Throwable e)
{
- super.failed(e);
- _onError=e;
+ _onError=e==null?new IOException():e;
_channel.getState().onWritePossible();
}
}
}
@Override
- protected void completed()
+ protected void onCompleteSuccess()
{
- super.completed();
+ super.onCompleteSuccess();
if (_complete)
closed();
}
}
@Override
- public void failed(Throwable x)
+ public void onCompleteFailure(Throwable x)
{
- super.failed(x);
+ super.onCompleteFailure(x);
_channel.getByteBufferPool().release(_buffer);
try
{
}
@Override
- public void failed(Throwable x)
+ public void onCompleteFailure(Throwable x)
{
- super.failed(x);
+ super.onCompleteFailure(x);
_channel.getByteBufferPool().release(_buffer);
try
{
}
}
}
-
}