]> WPIA git - gigi.git/blobdiff - lib/jetty/org/eclipse/jetty/server/HttpConnection.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / server / HttpConnection.java
diff --git a/lib/jetty/org/eclipse/jetty/server/HttpConnection.java b/lib/jetty/org/eclipse/jetty/server/HttpConnection.java
new file mode 100644 (file)
index 0000000..16cbbf9
--- /dev/null
@@ -0,0 +1,790 @@
+//
+//  ========================================================================
+//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  ------------------------------------------------------------------------
+//  All rights reserved. This program and the accompanying materials
+//  are made available under the terms of the Eclipse Public License v1.0
+//  and Apache License v2.0 which accompanies this distribution.
+//
+//      The Eclipse Public License is available at
+//      http://www.eclipse.org/legal/epl-v10.html
+//
+//      The Apache License v2.0 is available at
+//      http://www.opensource.org/licenses/apache2.0.php
+//
+//  You may elect to redistribute this code under either of these licenses.
+//  ========================================================================
+//
+
+package org.eclipse.jetty.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.eclipse.jetty.http.HttpGenerator;
+import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpHeaderValue;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpParser;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.io.AbstractConnection;
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.io.EofException;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.IteratingCallback;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+
+/**
+ * <p>A {@link Connection} that handles the HTTP protocol.</p>
+ */
+public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
+{
+    public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
+    private static final boolean REQUEST_BUFFER_DIRECT=false;
+    private static final boolean HEADER_BUFFER_DIRECT=false;
+    private static final boolean CHUNK_BUFFER_DIRECT=false;
+    private static final Logger LOG = Log.getLogger(HttpConnection.class);
+    private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
+
+    private final HttpConfiguration _config;
+    private final Connector _connector;
+    private final ByteBufferPool _bufferPool;
+    private final HttpGenerator _generator;
+    private final HttpChannelOverHttp _channel;
+    private final HttpParser _parser;
+    private volatile ByteBuffer _requestBuffer = null;
+    private volatile ByteBuffer _chunk = null;
+
+
+    /* ------------------------------------------------------------ */
+    /** Get the current connection that this thread is dispatched to.
+     * Note that a thread may be processing a request asynchronously and 
+     * thus not be dispatched to the connection.  
+     * @see Request#getAttribute(String) for a more general way to access the HttpConnection
+     * @return the current HttpConnection or null
+     */
+    public static HttpConnection getCurrentConnection()
+    {
+        return __currentConnection.get();
+    }
+
+    protected static HttpConnection setCurrentConnection(HttpConnection connection)
+    {
+        HttpConnection last=__currentConnection.get();
+        if (connection==null)
+            __currentConnection.remove();
+        else 
+            __currentConnection.set(connection);
+        return last;
+    }
+
+    public HttpConfiguration getHttpConfiguration()
+    {
+        return _config;
+    }
+
+    public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
+    {
+        // Tell AbstractConnector executeOnFillable==true because we want the same thread that
+        // does the HTTP parsing to handle the request so its cache is hot
+        super(endPoint, connector.getExecutor(),true);
+
+        _config = config;
+        _connector = connector;
+        _bufferPool = _connector.getByteBufferPool();
+        _generator = newHttpGenerator();
+        HttpInput<ByteBuffer> input = newHttpInput();
+        _channel = newHttpChannel(input);
+        _parser = newHttpParser();
+        LOG.debug("New HTTP Connection {}", this);
+    }
+
+    protected HttpGenerator newHttpGenerator()
+    {
+        return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
+    }
+    
+    protected HttpInput<ByteBuffer> newHttpInput()
+    {
+        return new HttpInputOverHTTP(this);
+    }
+    
+    protected HttpChannelOverHttp newHttpChannel(HttpInput<ByteBuffer> httpInput)
+    {
+        return new HttpChannelOverHttp(_connector, _config, getEndPoint(), this, httpInput);
+    }
+    
+    protected HttpParser newHttpParser()
+    {
+        return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
+    }
+
+    protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
+    {
+        return _channel;
+    }
+
+    public Server getServer()
+    {
+        return _connector.getServer();
+    }
+
+    public Connector getConnector()
+    {
+        return _connector;
+    }
+
+    public HttpChannel<?> getHttpChannel()
+    {
+        return _channel;
+    }
+
+    public HttpParser getParser()
+    {
+        return _parser;
+    }
+
+    @Override
+    public int getMessagesIn()
+    {
+        return getHttpChannel().getRequests();
+    }
+
+    @Override
+    public int getMessagesOut()
+    {
+        return getHttpChannel().getRequests();
+    }
+
+    void releaseRequestBuffer()
+    {
+        if (_requestBuffer != null && !_requestBuffer.hasRemaining())
+        {
+            ByteBuffer buffer=_requestBuffer;
+            _requestBuffer=null;
+            _bufferPool.release(buffer);
+        }
+    }
+    
+    public ByteBuffer getRequestBuffer()
+    {
+        if (_requestBuffer == null)
+            _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
+        return _requestBuffer;
+    }
+
+    /**
+     * <p>Parses and handles HTTP messages.</p>
+     * <p>This method is called when this {@link Connection} is ready to read bytes from the {@link EndPoint}.
+     * However, it can also be called if there is unconsumed data in the _requestBuffer, as a result of
+     * resuming a suspended request when there is a pipelined request already read into the buffer.</p>
+     * <p>This method fills bytes and parses them until either: EOF is filled; 0 bytes are filled;
+     * the HttpChannel finishes handling; or the connection has changed.</p>
+     */
+    @Override
+    public void onFillable()
+    {
+        LOG.debug("{} onFillable {}", this, _channel.getState());
+
+        final HttpConnection last=setCurrentConnection(this);
+        int filled=Integer.MAX_VALUE;
+        boolean suspended=false;
+        try
+        {
+            // while not suspended and not upgraded
+            while (!suspended && getEndPoint().getConnection()==this)
+            {
+                // Do we need some data to parse
+                if (BufferUtil.isEmpty(_requestBuffer))
+                {
+                    // If the previous iteration filled 0 bytes or saw a close, then break here 
+                    if (filled<=0)
+                        break;
+                        
+                    // Can we fill?
+                    if(getEndPoint().isInputShutdown())
+                    {
+                        // No pretend we read -1
+                        filled=-1;
+                        _parser.atEOF();
+                    }
+                    else
+                    {
+                        // Get a buffer
+                        // We are not in a race here for the request buffer as we have not yet received a request,
+                        // so there are not an possible legal threads calling #parseContent or #completed.
+                        _requestBuffer = getRequestBuffer();
+
+                        // fill
+                        filled = getEndPoint().fill(_requestBuffer);
+                        if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
+                            filled = getEndPoint().fill(_requestBuffer);
+                        
+                        // tell parser
+                        if (filled < 0)
+                            _parser.atEOF();
+                    }
+                }
+                
+                // Parse the buffer
+                if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
+                {
+                    // The parser returned true, which indicates the channel is ready to handle a request.
+                    // Call the channel and this will either handle the request/response to completion OR,
+                    // if the request suspends, the request/response will be incomplete so the outer loop will exit.
+                    // Not that onFillable no longer manipulates the request buffer from this point and that is
+                    // left to threads calling #completed or #parseContent (which may be this thread inside handle())
+                    suspended = !_channel.handle();
+                }
+                else
+                {
+                    // We parsed what we could, recycle the request buffer
+                    // We are not in a race here for the request buffer as we have not yet received a request,
+                    // so there are not an possible legal threads calling #parseContent or #completed.
+                    releaseRequestBuffer();
+                }
+            }
+        }
+        catch (EofException e)
+        {
+            LOG.debug(e);
+        }
+        catch (Exception e)
+        {
+            if (_parser.isIdle())
+                LOG.debug(e);
+            else
+                LOG.warn(this.toString(), e);
+            close();
+        }
+        finally
+        {                        
+            setCurrentConnection(last);
+            if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
+            {
+                fillInterested();
+            }
+        }
+    }
+
+    /* ------------------------------------------------------------ */
+    /** Fill and parse data looking for content
+     * @throws IOException
+     */
+    protected void parseContent() throws IOException
+    {
+        // Not in a race here for the request buffer with #onFillable because an async consumer of
+        // content would only be started after onFillable has given up control.
+        // In a little bit of a race with #completed, but then not sure if it is legal to be doing 
+        // async calls to IO and have a completed call at the same time.
+        ByteBuffer requestBuffer = getRequestBuffer();
+
+        while (_parser.inContentState())
+        {
+            // Can the parser progress (even with an empty buffer)
+            boolean parsed = _parser.parseNext(requestBuffer==null?BufferUtil.EMPTY_BUFFER:requestBuffer);
+
+            // No, we can we try reading some content?
+            if (BufferUtil.isEmpty(requestBuffer) && getEndPoint().isInputShutdown())
+            {
+                _parser.atEOF();
+                if (parsed)
+                    break;
+                continue;
+            }
+
+            if (parsed)
+                break;
+            
+            // OK lets read some data
+            int filled=getEndPoint().fill(requestBuffer);
+            if (LOG.isDebugEnabled()) // Avoid boxing of variable 'filled'
+                LOG.debug("{} filled {}",this,filled);
+            if (filled<=0)
+            {
+                if (filled<0)
+                {
+                    _parser.atEOF();
+                    continue;
+                }
+                break;
+            }
+        }
+    }
+    
+    @Override
+    public void completed()
+    {
+        // Handle connection upgrades
+        if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
+        {
+            Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
+            if (connection != null)
+            {
+                LOG.debug("Upgrade from {} to {}", this, connection);
+                onClose();
+                getEndPoint().setConnection(connection);
+                connection.onOpen();
+                _channel.reset();
+                _parser.reset();
+                _generator.reset();
+                releaseRequestBuffer();
+                return;
+            }
+        }
+        
+        // Finish consuming the request
+        // If we are still expecting
+        if (_channel.isExpecting100Continue())
+            // close to seek EOF
+            _parser.close();
+        else if (_parser.inContentState() && _generator.isPersistent())
+            // Complete reading the request
+            _channel.getRequest().getHttpInput().consumeAll();
+
+        // Reset the channel, parsers and generator
+        _channel.reset();
+        if (_generator.isPersistent() && !_parser.isClosed())
+            _parser.reset();
+        else
+            _parser.close();
+        
+        // Not in a race here with onFillable, because it has given up control before calling handle.
+        // in a slight race with #completed, but not sure what to do with that anyway.
+        releaseRequestBuffer();
+        if (_chunk!=null)
+            _bufferPool.release(_chunk);
+        _chunk=null;
+        _generator.reset();
+
+        // if we are not called from the onfillable thread, schedule completion
+        if (getCurrentConnection()!=this)
+        {
+            // If we are looking for the next request
+            if (_parser.isStart())
+            {
+                // if the buffer is empty
+                if (BufferUtil.isEmpty(_requestBuffer))
+                {
+                    // look for more data
+                    fillInterested();
+                }
+                // else if we are still running
+                else if (getConnector().isRunning())
+                {
+                    // Dispatched to handle a pipelined request
+                    try
+                    {
+                        getExecutor().execute(this);
+                    }
+                    catch (RejectedExecutionException e)
+                    {
+                        if (getConnector().isRunning())
+                            LOG.warn(e);
+                        else
+                            LOG.ignore(e);
+                        getEndPoint().close();
+                    }
+                }
+                else
+                {
+                    getEndPoint().close();
+                }
+            }
+            // else the parser must be closed, so seek the EOF if we are still open 
+            else if (getEndPoint().isOpen())
+                fillInterested();
+        }
+    }
+
+    @Override
+    protected void onFillInterestedFailed(Throwable cause)
+    {
+        _parser.close();
+        super.onFillInterestedFailed(cause);
+    }
+
+    @Override
+    public void onOpen()
+    {
+        super.onOpen();
+        fillInterested();
+    }
+
+    @Override
+    public void run()
+    {
+        onFillable();
+    }
+
+
+    @Override
+    public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
+    {
+        if (info==null)
+            new ContentCallback(content,lastContent,callback).iterate();
+        else
+        {
+            // If we are still expecting a 100 continues
+            if (_channel.isExpecting100Continue())
+                // then we can't be persistent
+                _generator.setPersistent(false);
+            new CommitCallback(info,content,lastContent,callback).iterate();
+        }
+    }
+
+    @Override
+    public void send(ByteBuffer content, boolean lastContent, Callback callback)
+    {
+        new ContentCallback(content,lastContent,callback).iterate();
+    }
+
+    
+    protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
+    {
+        public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
+        {
+            super(connector,config,endPoint,transport,input);
+        }
+        
+        @Override
+        public void earlyEOF()
+        {
+            // If we have no request yet, just close
+            if (getRequest().getMethod()==null)
+                close();
+            else
+                super.earlyEOF();
+        }
+
+        @Override
+        public boolean content(ByteBuffer item)
+        {
+            super.content(item);
+            return true;
+        }
+
+        @Override
+        public void badMessage(int status, String reason)
+        {
+            _generator.setPersistent(false);
+            super.badMessage(status,reason);
+        }
+
+        @Override
+        public boolean headerComplete()
+        {
+            boolean persistent;
+            HttpVersion version = getHttpVersion();
+
+            switch (version)
+            {
+                case HTTP_0_9:
+                {
+                    persistent = false;
+                    break;
+                }
+                case HTTP_1_0:
+                {
+                    persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
+                    if (!persistent)
+                        persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
+                    if (persistent)
+                        getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
+                    break;
+                }
+                case HTTP_1_1:
+                {
+                    persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
+                    if (!persistent)
+                        persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
+                    if (!persistent)
+                        getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
+                    break;
+                }
+                default:
+                {
+                    throw new IllegalStateException();
+                }
+            }
+
+            if (!persistent)
+                _generator.setPersistent(false);
+
+            return super.headerComplete();
+        }
+
+        @Override
+        protected void handleException(Throwable x)
+        {
+            _generator.setPersistent(false);
+            super.handleException(x);
+        }
+
+        @Override
+        public void failed()
+        {
+            getEndPoint().shutdownOutput();
+        }
+        
+
+        @Override
+        public boolean messageComplete()
+        {
+            super.messageComplete();
+            return false;
+        }
+    }
+
+    private class CommitCallback extends IteratingCallback
+    {
+        final ByteBuffer _content;
+        final boolean _lastContent;
+        final ResponseInfo _info;
+        final Callback _callback;
+        ByteBuffer _header;
+
+        CommitCallback(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
+        {
+            _info=info;
+            _content=content;
+            _lastContent=last;
+            _callback=callback;
+        }
+
+        @Override
+        public Action process() throws Exception
+        {
+            ByteBuffer chunk = _chunk;
+            while (true)
+            {
+                HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent);
+                if (LOG.isDebugEnabled())
+                    LOG.debug("{} generate: {} ({},{},{})@{}",
+                        this,
+                        result,
+                        BufferUtil.toSummaryString(_header),
+                        BufferUtil.toSummaryString(_content),
+                        _lastContent,
+                        _generator.getState());
+
+                switch (result)
+                {
+                    case NEED_HEADER:
+                    {
+                        // Look for optimisation to avoid allocating a _header buffer
+                        /*
+                         Cannot use this optimisation unless we work out how not to overwrite data in user passed arrays.
+                        if (_lastContent && _content!=null && !_content.isReadOnly() && _content.hasArray() && BufferUtil.space(_content)>_config.getResponseHeaderSize() )
+                        {
+                            // use spare space in content buffer for header buffer
+                            int p=_content.position();
+                            int l=_content.limit();
+                            _content.position(l);
+                            _content.limit(l+_config.getResponseHeaderSize());
+                            _header=_content.slice();
+                            _header.limit(0);
+                            _content.position(p);
+                            _content.limit(l);
+                        }
+                        else
+                        */
+                            _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
+                            
+                        continue;
+                    }
+                    case NEED_CHUNK:
+                    {
+                        chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
+                        continue;
+                    }
+                    case FLUSH:
+                    {
+                        // Don't write the chunk or the content if this is a HEAD response
+                        if (_channel.getRequest().isHead())
+                        {
+                            BufferUtil.clear(chunk);
+                            BufferUtil.clear(_content);
+                        }
+
+                        // If we have a header
+                        if (BufferUtil.hasContent(_header))
+                        {
+                            if (BufferUtil.hasContent(_content))
+                            {
+                                if (BufferUtil.hasContent(chunk))
+                                    getEndPoint().write(this, _header, chunk, _content);
+                                else
+                                    getEndPoint().write(this, _header, _content);
+                            }
+                            else
+                                getEndPoint().write(this, _header);
+                        }
+                        else if (BufferUtil.hasContent(chunk))
+                        {
+                            if (BufferUtil.hasContent(_content))
+                                getEndPoint().write(this, chunk, _content);
+                            else
+                                getEndPoint().write(this, chunk);
+                        }
+                        else if (BufferUtil.hasContent(_content))
+                        {
+                            getEndPoint().write(this, _content);
+                        }
+                        else
+                            continue;
+                        return Action.SCHEDULED;
+                    }
+                    case SHUTDOWN_OUT:
+                    {
+                        getEndPoint().shutdownOutput();
+                        continue;
+                    }
+                    case DONE:
+                    {
+                        if (_header!=null)
+                        {
+                            // don't release header in spare content buffer
+                            if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() ||  _content.array()!=_header.array())
+                                _bufferPool.release(_header);
+                        }
+                        return Action.SUCCEEDED;
+                    }
+                    case CONTINUE:
+                    {
+                        break;
+                    }
+                    default:
+                    {
+                        throw new IllegalStateException("generateResponse="+result);
+                    }
+                }
+            }
+        }
+
+        @Override
+        protected void completed()
+        {
+            _callback.succeeded();
+        }
+
+        @Override
+        public void failed(final Throwable x)
+        {
+            super.failed(x);
+            failedCallback(_callback,x);
+        }
+    }
+
+    private class ContentCallback extends IteratingCallback
+    {
+        final ByteBuffer _content;
+        final boolean _lastContent;
+        final Callback _callback;
+
+        ContentCallback(ByteBuffer content, boolean last, Callback callback)
+        {
+            _content=content;
+            _lastContent=last;
+            _callback=callback;
+        }
+
+        @Override
+        public Action process() throws Exception
+        {
+            ByteBuffer chunk = _chunk;
+            while (true)
+            {
+                HttpGenerator.Result result = _generator.generateResponse(null, null, chunk, _content, _lastContent);
+                if (LOG.isDebugEnabled())
+                    LOG.debug("{} generate: {} ({},{})@{}",
+                        this,
+                        result,
+                        BufferUtil.toSummaryString(_content),
+                        _lastContent,
+                        _generator.getState());
+
+                switch (result)
+                {
+                    case NEED_HEADER:
+                        throw new IllegalStateException();
+                    case NEED_CHUNK:
+                    {
+                        chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
+                        continue;
+                    }
+                    case FLUSH:
+                    {
+                        // Don't write the chunk or the content if this is a HEAD response
+                        if (_channel.getRequest().isHead())
+                        {
+                            BufferUtil.clear(chunk);
+                            BufferUtil.clear(_content);
+                            continue;
+                        }
+                        else if (BufferUtil.hasContent(chunk))
+                        {
+                            if (BufferUtil.hasContent(_content))
+                                getEndPoint().write(this, chunk, _content);
+                            else
+                                getEndPoint().write(this, chunk);
+                        }
+                        else if (BufferUtil.hasContent(_content))
+                        {
+                            getEndPoint().write(this, _content);
+                        }
+                        else
+                            continue;
+                        return Action.SCHEDULED;
+                    }
+                    case SHUTDOWN_OUT:
+                    {
+                        getEndPoint().shutdownOutput();
+                        continue;
+                    }
+                    case DONE:
+                    {
+                        return Action.SUCCEEDED;
+                    }
+                    case CONTINUE:
+                    {
+                        break;
+                    }
+                    default:
+                    {
+                        throw new IllegalStateException("generateResponse="+result);
+                    }
+                }
+            }
+        }
+
+        @Override
+        protected void completed()
+        {
+            _callback.succeeded();
+        }
+
+        @Override
+        public void failed(final Throwable x)
+        {
+            super.failed(x);
+            failedCallback(_callback,x);
+        }
+    }
+
+    @Override
+    public void abort()
+    {
+        // Do a direct close of the output, as this may indicate to a client that the 
+        // response is bad either with RST or by abnormal completion of chunked response.
+        getEndPoint().close();
+    }
+
+}