2 // ========================================================================
3 // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.server;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.WritePendingException;
24 import java.util.concurrent.RejectedExecutionException;
26 import org.eclipse.jetty.http.HttpGenerator;
27 import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
28 import org.eclipse.jetty.http.HttpHeader;
29 import org.eclipse.jetty.http.HttpHeaderValue;
30 import org.eclipse.jetty.http.HttpMethod;
31 import org.eclipse.jetty.http.HttpParser;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.HttpVersion;
34 import org.eclipse.jetty.io.AbstractConnection;
35 import org.eclipse.jetty.io.ByteBufferPool;
36 import org.eclipse.jetty.io.Connection;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.io.EofException;
39 import org.eclipse.jetty.util.BufferUtil;
40 import org.eclipse.jetty.util.Callback;
41 import org.eclipse.jetty.util.IteratingCallback;
42 import org.eclipse.jetty.util.log.Log;
43 import org.eclipse.jetty.util.log.Logger;
46 * <p>A {@link Connection} that handles the HTTP protocol.</p>
48 public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom
50 public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
51 private static final boolean REQUEST_BUFFER_DIRECT=false;
52 private static final boolean HEADER_BUFFER_DIRECT=false;
53 private static final boolean CHUNK_BUFFER_DIRECT=false;
54 private static final Logger LOG = Log.getLogger(HttpConnection.class);
55 private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
57 private final HttpConfiguration _config;
58 private final Connector _connector;
59 private final ByteBufferPool _bufferPool;
60 private final HttpGenerator _generator;
61 private final HttpChannelOverHttp _channel;
62 private final HttpParser _parser;
63 private volatile ByteBuffer _requestBuffer = null;
64 private volatile ByteBuffer _chunk = null;
65 private final SendCallback _sendCallback = new SendCallback();
68 /* ------------------------------------------------------------ */
69 /** Get the current connection that this thread is dispatched to.
70 * Note that a thread may be processing a request asynchronously and
71 * thus not be dispatched to the connection.
72 * @see Request#getAttribute(String) for a more general way to access the HttpConnection
73 * @return the current HttpConnection or null
75 public static HttpConnection getCurrentConnection()
77 return __currentConnection.get();
80 protected static HttpConnection setCurrentConnection(HttpConnection connection)
82 HttpConnection last=__currentConnection.get();
83 __currentConnection.set(connection);
87 public HttpConfiguration getHttpConfiguration()
92 public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
94 // Tell AbstractConnector executeOnFillable==true because we want the same thread that
95 // does the HTTP parsing to handle the request so its cache is hot
96 super(endPoint, connector.getExecutor(),true);
99 _connector = connector;
100 _bufferPool = _connector.getByteBufferPool();
101 _generator = newHttpGenerator();
102 HttpInput<ByteBuffer> input = newHttpInput();
103 _channel = newHttpChannel(input);
104 _parser = newHttpParser();
105 if (LOG.isDebugEnabled())
106 LOG.debug("New HTTP Connection {}", this);
109 protected HttpGenerator newHttpGenerator()
111 return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
114 protected HttpInput<ByteBuffer> newHttpInput()
116 return new HttpInputOverHTTP(this);
119 protected HttpChannelOverHttp newHttpChannel(HttpInput<ByteBuffer> httpInput)
121 return new HttpChannelOverHttp(_connector, _config, getEndPoint(), this, httpInput);
124 protected HttpParser newHttpParser()
126 return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
129 protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
134 public Server getServer()
136 return _connector.getServer();
139 public Connector getConnector()
144 public HttpChannel<?> getHttpChannel()
149 public HttpParser getParser()
155 public int getMessagesIn()
157 return getHttpChannel().getRequests();
161 public int getMessagesOut()
163 return getHttpChannel().getRequests();
167 public ByteBuffer onUpgradeFrom()
169 if (BufferUtil.hasContent(_requestBuffer))
171 ByteBuffer buffer = _requestBuffer;
178 void releaseRequestBuffer()
180 if (_requestBuffer != null && !_requestBuffer.hasRemaining())
182 ByteBuffer buffer=_requestBuffer;
184 _bufferPool.release(buffer);
188 public ByteBuffer getRequestBuffer()
190 if (_requestBuffer == null)
191 _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
192 return _requestBuffer;
196 * <p>Parses and handles HTTP messages.</p>
197 * <p>This method is called when this {@link Connection} is ready to read bytes from the {@link EndPoint}.
198 * However, it can also be called if there is unconsumed data in the _requestBuffer, as a result of
199 * resuming a suspended request when there is a pipelined request already read into the buffer.</p>
200 * <p>This method fills bytes and parses them until either: EOF is filled; 0 bytes are filled;
201 * the HttpChannel finishes handling; or the connection has changed.</p>
204 public void onFillable()
206 if (LOG.isDebugEnabled())
207 LOG.debug("{} onFillable {}", this, _channel.getState());
209 final HttpConnection last=setCurrentConnection(this);
210 int filled=Integer.MAX_VALUE;
211 boolean suspended=false;
214 // while not suspended and not upgraded
215 while (!suspended && getEndPoint().getConnection()==this)
217 // Do we need some data to parse
218 if (BufferUtil.isEmpty(_requestBuffer))
220 // If the previous iteration filled 0 bytes or saw a close, then break here
225 if(getEndPoint().isInputShutdown())
227 // No pretend we read -1
234 // We are not in a race here for the request buffer as we have not yet received a request,
235 // so there are not an possible legal threads calling #parseContent or #completed.
236 _requestBuffer = getRequestBuffer();
239 filled = getEndPoint().fill(_requestBuffer);
240 if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
241 filled = getEndPoint().fill(_requestBuffer);
250 if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
252 // The parser returned true, which indicates the channel is ready to handle a request.
253 // Call the channel and this will either handle the request/response to completion OR,
254 // if the request suspends, the request/response will be incomplete so the outer loop will exit.
255 // Not that onFillable no longer manipulates the request buffer from this point and that is
256 // left to threads calling #completed or #parseContent (which may be this thread inside handle())
257 suspended = !_channel.handle();
261 // We parsed what we could, recycle the request buffer
262 // We are not in a race here for the request buffer as we have not yet received a request,
263 // so there are not an possible legal threads calling #parseContent or #completed.
264 releaseRequestBuffer();
268 catch (EofException e)
274 if (_parser.isIdle())
277 LOG.warn(this.toString(), e);
282 setCurrentConnection(last);
283 if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
290 /* ------------------------------------------------------------ */
291 /** Fill and parse data looking for content
292 * @throws IOException
294 protected void parseContent() throws IOException
296 // Not in a race here for the request buffer with #onFillable because an async consumer of
297 // content would only be started after onFillable has given up control.
298 // In a little bit of a race with #completed, but then not sure if it is legal to be doing
299 // async calls to IO and have a completed call at the same time.
300 ByteBuffer requestBuffer = getRequestBuffer();
302 while (_parser.inContentState())
304 // Can the parser progress (even with an empty buffer)
305 boolean parsed = _parser.parseNext(requestBuffer==null?BufferUtil.EMPTY_BUFFER:requestBuffer);
307 // No, we can we try reading some content?
308 if (BufferUtil.isEmpty(requestBuffer) && getEndPoint().isInputShutdown())
319 // OK lets read some data
320 int filled=getEndPoint().fill(requestBuffer);
321 if (LOG.isDebugEnabled()) // Avoid boxing of variable 'filled'
322 LOG.debug("{} filled {}",this,filled);
336 public void completed()
338 // Handle connection upgrades
339 if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
341 Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
342 if (connection != null)
344 _channel.getState().upgrade();
345 getEndPoint().upgrade(connection);
349 releaseRequestBuffer();
354 // Finish consuming the request
355 // If we are still expecting
356 if (_channel.isExpecting100Continue())
361 else if (_parser.inContentState() && _generator.isPersistent())
363 // If we are async, then we have problems to complete neatly
364 if (_channel.getRequest().getHttpInput().isAsync())
366 if (LOG.isDebugEnabled())
367 LOG.debug("unconsumed async input {}", this);
372 if (LOG.isDebugEnabled())
373 LOG.debug("unconsumed input {}", this);
374 // Complete reading the request
375 if (!_channel.getRequest().getHttpInput().consumeAll())
380 // Reset the channel, parsers and generator
382 if (_generator.isPersistent() && !_parser.isClosed())
387 // Not in a race here with onFillable, because it has given up control before calling handle.
388 // in a slight race with #completed, but not sure what to do with that anyway.
389 releaseRequestBuffer();
391 _bufferPool.release(_chunk);
395 // if we are not called from the onfillable thread, schedule completion
396 if (getCurrentConnection()!=this)
398 // If we are looking for the next request
399 if (_parser.isStart())
401 // if the buffer is empty
402 if (BufferUtil.isEmpty(_requestBuffer))
404 // look for more data
407 // else if we are still running
408 else if (getConnector().isRunning())
410 // Dispatched to handle a pipelined request
413 getExecutor().execute(this);
415 catch (RejectedExecutionException e)
417 if (getConnector().isRunning())
421 getEndPoint().close();
426 getEndPoint().close();
429 // else the parser must be closed, so seek the EOF if we are still open
430 else if (getEndPoint().isOpen())
436 protected void onFillInterestedFailed(Throwable cause)
439 super.onFillInterestedFailed(cause);
450 public void onClose()
452 _sendCallback.close();
463 public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
465 // If we are still expecting a 100 continues when we commit
466 if (info!=null && _channel.isExpecting100Continue())
467 // then we can't be persistent
468 _generator.setPersistent(false);
470 if(_sendCallback.reset(info,content,lastContent,callback))
471 _sendCallback.iterate();
475 public void send(ByteBuffer content, boolean lastContent, Callback callback)
477 if (!lastContent && BufferUtil.isEmpty(content))
478 callback.succeeded();
479 else if (_sendCallback.reset(null,content,lastContent,callback))
480 _sendCallback.iterate();
486 // Do a direct close of the output, as this may indicate to a client that the
487 // response is bad either with RST or by abnormal completion of chunked response.
488 getEndPoint().close();
492 public String toString()
494 return String.format("%s[p=%s,g=%s,c=%s]",
501 protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
503 public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
505 super(connector,config,endPoint,transport,input);
509 public void earlyEOF()
511 // If we have no request yet, just close
512 if (getRequest().getMethod()==null)
519 public boolean content(ByteBuffer item)
526 public void badMessage(int status, String reason)
528 _generator.setPersistent(false);
529 super.badMessage(status,reason);
533 public boolean headerComplete()
536 HttpVersion version = getHttpVersion();
547 persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
549 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
551 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
556 persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
558 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
560 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
566 badMessage(400,null);
571 throw new IllegalStateException();
576 _generator.setPersistent(false);
578 if (!super.headerComplete())
581 // Should we delay dispatch until we have some content?
582 // We should not delay if there is no content expect or client is expecting 100 or the response is already committed or the request buffer already has something in it to parse
583 if (getHttpConfiguration().isDelayDispatchUntilContent() && _parser.getContentLength() > 0 &&
584 !isExpecting100Continue() && !isCommitted() && BufferUtil.isEmpty(_requestBuffer))
591 protected void handleException(Throwable x)
593 _generator.setPersistent(false);
594 super.handleException(x);
601 _generator.setPersistent(false);
605 public boolean messageComplete()
607 super.messageComplete();
612 private class SendCallback extends IteratingCallback
614 private ResponseInfo _info;
615 private ByteBuffer _content;
616 private boolean _lastContent;
617 private Callback _callback;
618 private ByteBuffer _header;
619 private boolean _shutdownOut;
621 private SendCallback()
626 private boolean reset(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
633 _callback = callback;
635 _shutdownOut = false;
640 callback.failed(new EofException());
642 callback.failed(new WritePendingException());
647 public Action process() throws Exception
650 throw new IllegalStateException();
652 ByteBuffer chunk = _chunk;
655 HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent);
656 if (LOG.isDebugEnabled())
657 LOG.debug("{} generate: {} ({},{},{})@{}",
660 BufferUtil.toSummaryString(_header),
661 BufferUtil.toSummaryString(_content),
663 _generator.getState());
669 _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
674 chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
679 // Don't write the chunk or the content if this is a HEAD response, or any other type of response that should have no content
680 if (_channel.getRequest().isHead() || _generator.isNoContent())
682 BufferUtil.clear(chunk);
683 BufferUtil.clear(_content);
686 // If we have a header
687 if (BufferUtil.hasContent(_header))
689 if (BufferUtil.hasContent(_content))
691 if (BufferUtil.hasContent(chunk))
692 getEndPoint().write(this, _header, chunk, _content);
694 getEndPoint().write(this, _header, _content);
697 getEndPoint().write(this, _header);
699 else if (BufferUtil.hasContent(chunk))
701 if (BufferUtil.hasContent(_content))
702 getEndPoint().write(this, chunk, _content);
704 getEndPoint().write(this, chunk);
706 else if (BufferUtil.hasContent(_content))
708 getEndPoint().write(this, _content);
712 succeeded(); // nothing to write
714 return Action.SCHEDULED;
723 return Action.SUCCEEDED;
731 throw new IllegalStateException("generateResponse="+result);
737 private void releaseHeader()
739 ByteBuffer h=_header;
742 _bufferPool.release(h);
746 protected void onCompleteSuccess()
749 _callback.succeeded();
751 getEndPoint().shutdownOutput();
755 public void onCompleteFailure(final Throwable x)
758 failedCallback(_callback,x);
760 getEndPoint().shutdownOutput();
764 public String toString()
766 return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);