//
// ========================================================================
-// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.WritePendingException;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.http.HttpGenerator;
/**
* <p>A {@link Connection} that handles the HTTP protocol.</p>
*/
-public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
+public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom
{
public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
private static final boolean REQUEST_BUFFER_DIRECT=false;
private final HttpParser _parser;
private volatile ByteBuffer _requestBuffer = null;
private volatile ByteBuffer _chunk = null;
+ private final SendCallback _sendCallback = new SendCallback();
/* ------------------------------------------------------------ */
/** Get the current connection that this thread is dispatched to.
- * Note that a thread may be processing a request asynchronously and
- * thus not be dispatched to the connection.
+ * Note that a thread may be processing a request asynchronously and
+ * thus not be dispatched to the connection.
* @see Request#getAttribute(String) for a more general way to access the HttpConnection
* @return the current HttpConnection or null
*/
protected static HttpConnection setCurrentConnection(HttpConnection connection)
{
HttpConnection last=__currentConnection.get();
- if (connection==null)
- __currentConnection.remove();
- else
- __currentConnection.set(connection);
+ __currentConnection.set(connection);
return last;
}
HttpInput<ByteBuffer> input = newHttpInput();
_channel = newHttpChannel(input);
_parser = newHttpParser();
- LOG.debug("New HTTP Connection {}", this);
+ if (LOG.isDebugEnabled())
+ LOG.debug("New HTTP Connection {}", this);
}
protected HttpGenerator newHttpGenerator()
{
return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
}
-
+
protected HttpInput<ByteBuffer> newHttpInput()
{
return new HttpInputOverHTTP(this);
}
-
+
protected HttpChannelOverHttp newHttpChannel(HttpInput<ByteBuffer> httpInput)
{
return new HttpChannelOverHttp(_connector, _config, getEndPoint(), this, httpInput);
}
-
+
protected HttpParser newHttpParser()
{
return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
return getHttpChannel().getRequests();
}
+ @Override
+ public ByteBuffer onUpgradeFrom()
+ {
+ if (BufferUtil.hasContent(_requestBuffer))
+ {
+ ByteBuffer buffer = _requestBuffer;
+ _requestBuffer=null;
+ return buffer;
+ }
+ return null;
+ }
+
void releaseRequestBuffer()
{
if (_requestBuffer != null && !_requestBuffer.hasRemaining())
_bufferPool.release(buffer);
}
}
-
+
public ByteBuffer getRequestBuffer()
{
if (_requestBuffer == null)
@Override
public void onFillable()
{
- LOG.debug("{} onFillable {}", this, _channel.getState());
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} onFillable {}", this, _channel.getState());
final HttpConnection last=setCurrentConnection(this);
int filled=Integer.MAX_VALUE;
// Do we need some data to parse
if (BufferUtil.isEmpty(_requestBuffer))
{
- // If the previous iteration filled 0 bytes or saw a close, then break here
+ // If the previous iteration filled 0 bytes or saw a close, then break here
if (filled<=0)
break;
-
+
// Can we fill?
if(getEndPoint().isInputShutdown())
{
filled = getEndPoint().fill(_requestBuffer);
if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
filled = getEndPoint().fill(_requestBuffer);
-
+
// tell parser
if (filled < 0)
_parser.atEOF();
}
}
-
+
// Parse the buffer
if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
{
close();
}
finally
- {
+ {
setCurrentConnection(last);
if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
{
{
// Not in a race here for the request buffer with #onFillable because an async consumer of
// content would only be started after onFillable has given up control.
- // In a little bit of a race with #completed, but then not sure if it is legal to be doing
+ // In a little bit of a race with #completed, but then not sure if it is legal to be doing
// async calls to IO and have a completed call at the same time.
ByteBuffer requestBuffer = getRequestBuffer();
if (parsed)
break;
-
+
// OK lets read some data
int filled=getEndPoint().fill(requestBuffer);
if (LOG.isDebugEnabled()) // Avoid boxing of variable 'filled'
}
}
}
-
+
@Override
public void completed()
{
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
if (connection != null)
{
- LOG.debug("Upgrade from {} to {}", this, connection);
- onClose();
- getEndPoint().setConnection(connection);
- connection.onOpen();
+ _channel.getState().upgrade();
+ getEndPoint().upgrade(connection);
_channel.reset();
_parser.reset();
_generator.reset();
return;
}
}
-
+
// Finish consuming the request
// If we are still expecting
if (_channel.isExpecting100Continue())
+ {
// close to seek EOF
_parser.close();
+ }
else if (_parser.inContentState() && _generator.isPersistent())
- // Complete reading the request
- _channel.getRequest().getHttpInput().consumeAll();
+ {
+ // If we are async, then we have problems to complete neatly
+ if (_channel.getRequest().getHttpInput().isAsync())
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("unconsumed async input {}", this);
+ _channel.abort();
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("unconsumed input {}", this);
+ // Complete reading the request
+ if (!_channel.getRequest().getHttpInput().consumeAll())
+ _channel.abort();
+ }
+ }
// Reset the channel, parsers and generator
_channel.reset();
_parser.reset();
else
_parser.close();
-
+
// Not in a race here with onFillable, because it has given up control before calling handle.
// in a slight race with #completed, but not sure what to do with that anyway.
releaseRequestBuffer();
getEndPoint().close();
}
}
- // else the parser must be closed, so seek the EOF if we are still open
+ // else the parser must be closed, so seek the EOF if we are still open
else if (getEndPoint().isOpen())
fillInterested();
}
fillInterested();
}
+ @Override
+ public void onClose()
+ {
+ _sendCallback.close();
+ super.onClose();
+ }
+
@Override
public void run()
{
onFillable();
}
-
@Override
public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
{
- if (info==null)
- new ContentCallback(content,lastContent,callback).iterate();
- else
- {
- // If we are still expecting a 100 continues
- if (_channel.isExpecting100Continue())
- // then we can't be persistent
- _generator.setPersistent(false);
- new CommitCallback(info,content,lastContent,callback).iterate();
- }
+ // If we are still expecting a 100 continues when we commit
+ if (info!=null && _channel.isExpecting100Continue())
+ // then we can't be persistent
+ _generator.setPersistent(false);
+
+ if(_sendCallback.reset(info,content,lastContent,callback))
+ _sendCallback.iterate();
}
@Override
public void send(ByteBuffer content, boolean lastContent, Callback callback)
{
- new ContentCallback(content,lastContent,callback).iterate();
+ if (!lastContent && BufferUtil.isEmpty(content))
+ callback.succeeded();
+ else if (_sendCallback.reset(null,content,lastContent,callback))
+ _sendCallback.iterate();
+ }
+
+ @Override
+ public void abort()
+ {
+ // Do a direct close of the output, as this may indicate to a client that the
+ // response is bad either with RST or by abnormal completion of chunked response.
+ getEndPoint().close();
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s[p=%s,g=%s,c=%s]",
+ super.toString(),
+ _parser,
+ _generator,
+ _channel);
}
-
protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
{
public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
{
super(connector,config,endPoint,transport,input);
}
-
+
@Override
public void earlyEOF()
{
getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
break;
}
+ case HTTP_2:
+ {
+ persistent=false;
+ badMessage(400,null);
+ return true;
+ }
default:
{
throw new IllegalStateException();
if (!persistent)
_generator.setPersistent(false);
- return super.headerComplete();
+ if (!super.headerComplete())
+ return false;
+
+ // Should we delay dispatch until we have some content?
+ // We should not delay if there is no content expect or client is expecting 100 or the response is already committed or the request buffer already has something in it to parse
+ if (getHttpConfiguration().isDelayDispatchUntilContent() && _parser.getContentLength() > 0 &&
+ !isExpecting100Continue() && !isCommitted() && BufferUtil.isEmpty(_requestBuffer))
+ return false;
+
+ return true;
}
@Override
}
@Override
- public void failed()
+ public void abort()
{
- getEndPoint().shutdownOutput();
+ super.abort();
+ _generator.setPersistent(false);
}
-
@Override
public boolean messageComplete()
}
}
- private class CommitCallback extends IteratingCallback
+ private class SendCallback extends IteratingCallback
{
- final ByteBuffer _content;
- final boolean _lastContent;
- final ResponseInfo _info;
- final Callback _callback;
- ByteBuffer _header;
+ private ResponseInfo _info;
+ private ByteBuffer _content;
+ private boolean _lastContent;
+ private Callback _callback;
+ private ByteBuffer _header;
+ private boolean _shutdownOut;
+
+ private SendCallback()
+ {
+ super(true);
+ }
- CommitCallback(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
+ private boolean reset(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
{
- _info=info;
- _content=content;
- _lastContent=last;
- _callback=callback;
+ if (reset())
+ {
+ _info = info;
+ _content = content;
+ _lastContent = last;
+ _callback = callback;
+ _header = null;
+ _shutdownOut = false;
+ return true;
+ }
+
+ if (isClosed())
+ callback.failed(new EofException());
+ else
+ callback.failed(new WritePendingException());
+ return false;
}
@Override
public Action process() throws Exception
{
+ if (_callback==null)
+ throw new IllegalStateException();
+
ByteBuffer chunk = _chunk;
while (true)
{
{
case NEED_HEADER:
{
- // Look for optimisation to avoid allocating a _header buffer
- /*
- Cannot use this optimisation unless we work out how not to overwrite data in user passed arrays.
- if (_lastContent && _content!=null && !_content.isReadOnly() && _content.hasArray() && BufferUtil.space(_content)>_config.getResponseHeaderSize() )
- {
- // use spare space in content buffer for header buffer
- int p=_content.position();
- int l=_content.limit();
- _content.position(l);
- _content.limit(l+_config.getResponseHeaderSize());
- _header=_content.slice();
- _header.limit(0);
- _content.position(p);
- _content.limit(l);
- }
- else
- */
- _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
-
+ _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
continue;
}
case NEED_CHUNK:
}
case FLUSH:
{
- // Don't write the chunk or the content if this is a HEAD response
- if (_channel.getRequest().isHead())
+ // Don't write the chunk or the content if this is a HEAD response, or any other type of response that should have no content
+ if (_channel.getRequest().isHead() || _generator.isNoContent())
{
BufferUtil.clear(chunk);
BufferUtil.clear(_content);
getEndPoint().write(this, _content);
}
else
- continue;
+ {
+ succeeded(); // nothing to write
+ }
return Action.SCHEDULED;
}
case SHUTDOWN_OUT:
{
- getEndPoint().shutdownOutput();
+ _shutdownOut=true;
continue;
}
case DONE:
{
- if (_header!=null)
- {
- // don't release header in spare content buffer
- if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array())
- _bufferPool.release(_header);
- }
return Action.SUCCEEDED;
}
case CONTINUE:
}
}
- @Override
- protected void completed()
- {
- _callback.succeeded();
- }
-
- @Override
- public void failed(final Throwable x)
- {
- super.failed(x);
- failedCallback(_callback,x);
- }
- }
-
- private class ContentCallback extends IteratingCallback
- {
- final ByteBuffer _content;
- final boolean _lastContent;
- final Callback _callback;
-
- ContentCallback(ByteBuffer content, boolean last, Callback callback)
+ private void releaseHeader()
{
- _content=content;
- _lastContent=last;
- _callback=callback;
+ ByteBuffer h=_header;
+ _header=null;
+ if (h!=null)
+ _bufferPool.release(h);
}
@Override
- public Action process() throws Exception
+ protected void onCompleteSuccess()
{
- ByteBuffer chunk = _chunk;
- while (true)
- {
- HttpGenerator.Result result = _generator.generateResponse(null, null, chunk, _content, _lastContent);
- if (LOG.isDebugEnabled())
- LOG.debug("{} generate: {} ({},{})@{}",
- this,
- result,
- BufferUtil.toSummaryString(_content),
- _lastContent,
- _generator.getState());
-
- switch (result)
- {
- case NEED_HEADER:
- throw new IllegalStateException();
- case NEED_CHUNK:
- {
- chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
- continue;
- }
- case FLUSH:
- {
- // Don't write the chunk or the content if this is a HEAD response
- if (_channel.getRequest().isHead())
- {
- BufferUtil.clear(chunk);
- BufferUtil.clear(_content);
- continue;
- }
- else if (BufferUtil.hasContent(chunk))
- {
- if (BufferUtil.hasContent(_content))
- getEndPoint().write(this, chunk, _content);
- else
- getEndPoint().write(this, chunk);
- }
- else if (BufferUtil.hasContent(_content))
- {
- getEndPoint().write(this, _content);
- }
- else
- continue;
- return Action.SCHEDULED;
- }
- case SHUTDOWN_OUT:
- {
- getEndPoint().shutdownOutput();
- continue;
- }
- case DONE:
- {
- return Action.SUCCEEDED;
- }
- case CONTINUE:
- {
- break;
- }
- default:
- {
- throw new IllegalStateException("generateResponse="+result);
- }
- }
- }
+ releaseHeader();
+ _callback.succeeded();
+ if (_shutdownOut)
+ getEndPoint().shutdownOutput();
}
@Override
- protected void completed()
+ public void onCompleteFailure(final Throwable x)
{
- _callback.succeeded();
+ releaseHeader();
+ failedCallback(_callback,x);
+ if (_shutdownOut)
+ getEndPoint().shutdownOutput();
}
@Override
- public void failed(final Throwable x)
+ public String toString()
{
- super.failed(x);
- failedCallback(_callback,x);
+ return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
}
}
-
- @Override
- public void abort()
- {
- // Do a direct close of the output, as this may indicate to a client that the
- // response is bad either with RST or by abnormal completion of chunked response.
- getEndPoint().close();
- }
-
}