]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/server/HttpConnection.java
updating jetty to jetty-9.2.16.v2016040
[gigi.git] / lib / jetty / org / eclipse / jetty / server / HttpConnection.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.WritePendingException;
24 import java.util.concurrent.RejectedExecutionException;
25
26 import org.eclipse.jetty.http.HttpGenerator;
27 import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
28 import org.eclipse.jetty.http.HttpHeader;
29 import org.eclipse.jetty.http.HttpHeaderValue;
30 import org.eclipse.jetty.http.HttpMethod;
31 import org.eclipse.jetty.http.HttpParser;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.HttpVersion;
34 import org.eclipse.jetty.io.AbstractConnection;
35 import org.eclipse.jetty.io.ByteBufferPool;
36 import org.eclipse.jetty.io.Connection;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.io.EofException;
39 import org.eclipse.jetty.util.BufferUtil;
40 import org.eclipse.jetty.util.Callback;
41 import org.eclipse.jetty.util.IteratingCallback;
42 import org.eclipse.jetty.util.log.Log;
43 import org.eclipse.jetty.util.log.Logger;
44
45 /**
46  * <p>A {@link Connection} that handles the HTTP protocol.</p>
47  */
48 public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom
49 {
50     public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
51     private static final boolean REQUEST_BUFFER_DIRECT=false;
52     private static final boolean HEADER_BUFFER_DIRECT=false;
53     private static final boolean CHUNK_BUFFER_DIRECT=false;
54     private static final Logger LOG = Log.getLogger(HttpConnection.class);
55     private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
56
57     private final HttpConfiguration _config;
58     private final Connector _connector;
59     private final ByteBufferPool _bufferPool;
60     private final HttpGenerator _generator;
61     private final HttpChannelOverHttp _channel;
62     private final HttpParser _parser;
63     private volatile ByteBuffer _requestBuffer = null;
64     private volatile ByteBuffer _chunk = null;
65     private final SendCallback _sendCallback = new SendCallback();
66
67
68     /* ------------------------------------------------------------ */
69     /** Get the current connection that this thread is dispatched to.
70      * Note that a thread may be processing a request asynchronously and
71      * thus not be dispatched to the connection.
72      * @see Request#getAttribute(String) for a more general way to access the HttpConnection
73      * @return the current HttpConnection or null
74      */
75     public static HttpConnection getCurrentConnection()
76     {
77         return __currentConnection.get();
78     }
79
80     protected static HttpConnection setCurrentConnection(HttpConnection connection)
81     {
82         HttpConnection last=__currentConnection.get();
83         __currentConnection.set(connection);
84         return last;
85     }
86
87     public HttpConfiguration getHttpConfiguration()
88     {
89         return _config;
90     }
91
92     public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
93     {
94         // Tell AbstractConnector executeOnFillable==true because we want the same thread that
95         // does the HTTP parsing to handle the request so its cache is hot
96         super(endPoint, connector.getExecutor(),true);
97
98         _config = config;
99         _connector = connector;
100         _bufferPool = _connector.getByteBufferPool();
101         _generator = newHttpGenerator();
102         HttpInput<ByteBuffer> input = newHttpInput();
103         _channel = newHttpChannel(input);
104         _parser = newHttpParser();
105         if (LOG.isDebugEnabled())
106             LOG.debug("New HTTP Connection {}", this);
107     }
108
109     protected HttpGenerator newHttpGenerator()
110     {
111         return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
112     }
113
114     protected HttpInput<ByteBuffer> newHttpInput()
115     {
116         return new HttpInputOverHTTP(this);
117     }
118
119     protected HttpChannelOverHttp newHttpChannel(HttpInput<ByteBuffer> httpInput)
120     {
121         return new HttpChannelOverHttp(_connector, _config, getEndPoint(), this, httpInput);
122     }
123
124     protected HttpParser newHttpParser()
125     {
126         return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
127     }
128
129     protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
130     {
131         return _channel;
132     }
133
134     public Server getServer()
135     {
136         return _connector.getServer();
137     }
138
139     public Connector getConnector()
140     {
141         return _connector;
142     }
143
144     public HttpChannel<?> getHttpChannel()
145     {
146         return _channel;
147     }
148
149     public HttpParser getParser()
150     {
151         return _parser;
152     }
153
154     @Override
155     public int getMessagesIn()
156     {
157         return getHttpChannel().getRequests();
158     }
159
160     @Override
161     public int getMessagesOut()
162     {
163         return getHttpChannel().getRequests();
164     }
165
166     @Override
167     public ByteBuffer onUpgradeFrom()
168     {
169         if (BufferUtil.hasContent(_requestBuffer))
170         {
171             ByteBuffer buffer = _requestBuffer;
172             _requestBuffer=null;
173             return buffer;
174         }
175         return null;
176     }
177
178     void releaseRequestBuffer()
179     {
180         if (_requestBuffer != null && !_requestBuffer.hasRemaining())
181         {
182             ByteBuffer buffer=_requestBuffer;
183             _requestBuffer=null;
184             _bufferPool.release(buffer);
185         }
186     }
187
188     public ByteBuffer getRequestBuffer()
189     {
190         if (_requestBuffer == null)
191             _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
192         return _requestBuffer;
193     }
194
195     /**
196      * <p>Parses and handles HTTP messages.</p>
197      * <p>This method is called when this {@link Connection} is ready to read bytes from the {@link EndPoint}.
198      * However, it can also be called if there is unconsumed data in the _requestBuffer, as a result of
199      * resuming a suspended request when there is a pipelined request already read into the buffer.</p>
200      * <p>This method fills bytes and parses them until either: EOF is filled; 0 bytes are filled;
201      * the HttpChannel finishes handling; or the connection has changed.</p>
202      */
203     @Override
204     public void onFillable()
205     {
206         if (LOG.isDebugEnabled())
207             LOG.debug("{} onFillable {}", this, _channel.getState());
208
209         final HttpConnection last=setCurrentConnection(this);
210         int filled=Integer.MAX_VALUE;
211         boolean suspended=false;
212         try
213         {
214             // while not suspended and not upgraded
215             while (!suspended && getEndPoint().getConnection()==this)
216             {
217                 // Do we need some data to parse
218                 if (BufferUtil.isEmpty(_requestBuffer))
219                 {
220                     // If the previous iteration filled 0 bytes or saw a close, then break here
221                     if (filled<=0)
222                         break;
223
224                     // Can we fill?
225                     if(getEndPoint().isInputShutdown())
226                     {
227                         // No pretend we read -1
228                         filled=-1;
229                         _parser.atEOF();
230                     }
231                     else
232                     {
233                         // Get a buffer
234                         // We are not in a race here for the request buffer as we have not yet received a request,
235                         // so there are not an possible legal threads calling #parseContent or #completed.
236                         _requestBuffer = getRequestBuffer();
237
238                         // fill
239                         filled = getEndPoint().fill(_requestBuffer);
240                         if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
241                             filled = getEndPoint().fill(_requestBuffer);
242
243                         // tell parser
244                         if (filled < 0)
245                             _parser.atEOF();
246                     }
247                 }
248
249                 // Parse the buffer
250                 if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
251                 {
252                     // The parser returned true, which indicates the channel is ready to handle a request.
253                     // Call the channel and this will either handle the request/response to completion OR,
254                     // if the request suspends, the request/response will be incomplete so the outer loop will exit.
255                     // Not that onFillable no longer manipulates the request buffer from this point and that is
256                     // left to threads calling #completed or #parseContent (which may be this thread inside handle())
257                     suspended = !_channel.handle();
258                 }
259                 else
260                 {
261                     // We parsed what we could, recycle the request buffer
262                     // We are not in a race here for the request buffer as we have not yet received a request,
263                     // so there are not an possible legal threads calling #parseContent or #completed.
264                     releaseRequestBuffer();
265                 }
266             }
267         }
268         catch (EofException e)
269         {
270             LOG.debug(e);
271         }
272         catch (Exception e)
273         {
274             if (_parser.isIdle())
275                 LOG.debug(e);
276             else
277                 LOG.warn(this.toString(), e);
278             close();
279         }
280         finally
281         {
282             setCurrentConnection(last);
283             if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
284             {
285                 fillInterested();
286             }
287         }
288     }
289
290     /* ------------------------------------------------------------ */
291     /** Fill and parse data looking for content
292      * @throws IOException
293      */
294     protected void parseContent() throws IOException
295     {
296         // Not in a race here for the request buffer with #onFillable because an async consumer of
297         // content would only be started after onFillable has given up control.
298         // In a little bit of a race with #completed, but then not sure if it is legal to be doing
299         // async calls to IO and have a completed call at the same time.
300         ByteBuffer requestBuffer = getRequestBuffer();
301
302         while (_parser.inContentState())
303         {
304             // Can the parser progress (even with an empty buffer)
305             boolean parsed = _parser.parseNext(requestBuffer==null?BufferUtil.EMPTY_BUFFER:requestBuffer);
306
307             // No, we can we try reading some content?
308             if (BufferUtil.isEmpty(requestBuffer) && getEndPoint().isInputShutdown())
309             {
310                 _parser.atEOF();
311                 if (parsed)
312                     break;
313                 continue;
314             }
315
316             if (parsed)
317                 break;
318
319             // OK lets read some data
320             int filled=getEndPoint().fill(requestBuffer);
321             if (LOG.isDebugEnabled()) // Avoid boxing of variable 'filled'
322                 LOG.debug("{} filled {}",this,filled);
323             if (filled<=0)
324             {
325                 if (filled<0)
326                 {
327                     _parser.atEOF();
328                     continue;
329                 }
330                 break;
331             }
332         }
333     }
334
335     @Override
336     public void completed()
337     {
338         // Handle connection upgrades
339         if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
340         {
341             Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
342             if (connection != null)
343             {
344                 _channel.getState().upgrade();
345                 getEndPoint().upgrade(connection);
346                 _channel.reset();
347                 _parser.reset();
348                 _generator.reset();
349                 releaseRequestBuffer();
350                 return;
351             }
352         }
353
354         // Finish consuming the request
355         // If we are still expecting
356         if (_channel.isExpecting100Continue())
357         {
358             // close to seek EOF
359             _parser.close();
360         }
361         else if (_parser.inContentState() && _generator.isPersistent())
362         {
363             // If we are async, then we have problems to complete neatly
364             if (_channel.getRequest().getHttpInput().isAsync())
365             {
366                 if (LOG.isDebugEnabled())
367                     LOG.debug("unconsumed async input {}", this);
368                 _channel.abort();
369             }
370             else
371             {
372                 if (LOG.isDebugEnabled())
373                     LOG.debug("unconsumed input {}", this);
374                 // Complete reading the request
375                 if (!_channel.getRequest().getHttpInput().consumeAll())
376                     _channel.abort();
377             }
378         }
379
380         // Reset the channel, parsers and generator
381         _channel.reset();
382         if (_generator.isPersistent() && !_parser.isClosed())
383             _parser.reset();
384         else
385             _parser.close();
386
387         // Not in a race here with onFillable, because it has given up control before calling handle.
388         // in a slight race with #completed, but not sure what to do with that anyway.
389         releaseRequestBuffer();
390         if (_chunk!=null)
391             _bufferPool.release(_chunk);
392         _chunk=null;
393         _generator.reset();
394
395         // if we are not called from the onfillable thread, schedule completion
396         if (getCurrentConnection()!=this)
397         {
398             // If we are looking for the next request
399             if (_parser.isStart())
400             {
401                 // if the buffer is empty
402                 if (BufferUtil.isEmpty(_requestBuffer))
403                 {
404                     // look for more data
405                     fillInterested();
406                 }
407                 // else if we are still running
408                 else if (getConnector().isRunning())
409                 {
410                     // Dispatched to handle a pipelined request
411                     try
412                     {
413                         getExecutor().execute(this);
414                     }
415                     catch (RejectedExecutionException e)
416                     {
417                         if (getConnector().isRunning())
418                             LOG.warn(e);
419                         else
420                             LOG.ignore(e);
421                         getEndPoint().close();
422                     }
423                 }
424                 else
425                 {
426                     getEndPoint().close();
427                 }
428             }
429             // else the parser must be closed, so seek the EOF if we are still open
430             else if (getEndPoint().isOpen())
431                 fillInterested();
432         }
433     }
434
435     @Override
436     protected void onFillInterestedFailed(Throwable cause)
437     {
438         _parser.close();
439         super.onFillInterestedFailed(cause);
440     }
441
442     @Override
443     public void onOpen()
444     {
445         super.onOpen();
446         fillInterested();
447     }
448
449     @Override
450     public void onClose()
451     {
452         _sendCallback.close();
453         super.onClose();
454     }
455
456     @Override
457     public void run()
458     {
459         onFillable();
460     }
461
462     @Override
463     public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
464     {
465         // If we are still expecting a 100 continues when we commit
466         if (info!=null && _channel.isExpecting100Continue())
467             // then we can't be persistent
468             _generator.setPersistent(false);
469
470         if(_sendCallback.reset(info,content,lastContent,callback))
471             _sendCallback.iterate();
472     }
473
474     @Override
475     public void send(ByteBuffer content, boolean lastContent, Callback callback)
476     {
477         if (!lastContent && BufferUtil.isEmpty(content))
478             callback.succeeded();
479         else if (_sendCallback.reset(null,content,lastContent,callback))
480             _sendCallback.iterate();
481     }
482
483     @Override
484     public void abort()
485     {
486         // Do a direct close of the output, as this may indicate to a client that the
487         // response is bad either with RST or by abnormal completion of chunked response.
488         getEndPoint().close();
489     }
490
491     @Override
492     public String toString()
493     {
494         return String.format("%s[p=%s,g=%s,c=%s]",
495                 super.toString(),
496                 _parser,
497                 _generator,
498                 _channel);
499     }
500
501     protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
502     {
503         public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
504         {
505             super(connector,config,endPoint,transport,input);
506         }
507
508         @Override
509         public void earlyEOF()
510         {
511             // If we have no request yet, just close
512             if (getRequest().getMethod()==null)
513                 close();
514             else
515                 super.earlyEOF();
516         }
517
518         @Override
519         public boolean content(ByteBuffer item)
520         {
521             super.content(item);
522             return true;
523         }
524
525         @Override
526         public void badMessage(int status, String reason)
527         {
528             _generator.setPersistent(false);
529             super.badMessage(status,reason);
530         }
531
532         @Override
533         public boolean headerComplete()
534         {
535             boolean persistent;
536             HttpVersion version = getHttpVersion();
537
538             switch (version)
539             {
540                 case HTTP_0_9:
541                 {
542                     persistent = false;
543                     break;
544                 }
545                 case HTTP_1_0:
546                 {
547                     persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
548                     if (!persistent)
549                         persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
550                     if (persistent)
551                         getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
552                     break;
553                 }
554                 case HTTP_1_1:
555                 {
556                     persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
557                     if (!persistent)
558                         persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
559                     if (!persistent)
560                         getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
561                     break;
562                 }
563                 case HTTP_2:
564                 {
565                     persistent=false;
566                     badMessage(400,null);
567                     return true;
568                 }
569                 default:
570                 {
571                     throw new IllegalStateException();
572                 }
573             }
574
575             if (!persistent)
576                 _generator.setPersistent(false);
577
578             if (!super.headerComplete())
579                 return false;
580
581             // Should we delay dispatch until we have some content?
582             // We should not delay if there is no content expect or client is expecting 100 or the response is already committed or the request buffer already has something in it to parse
583             if (getHttpConfiguration().isDelayDispatchUntilContent() && _parser.getContentLength() > 0 &&
584                     !isExpecting100Continue() && !isCommitted() && BufferUtil.isEmpty(_requestBuffer))
585                 return false;
586
587             return true;
588         }
589
590         @Override
591         protected void handleException(Throwable x)
592         {
593             _generator.setPersistent(false);
594             super.handleException(x);
595         }
596
597         @Override
598         public void abort()
599         {
600             super.abort();
601             _generator.setPersistent(false);
602         }
603
604         @Override
605         public boolean messageComplete()
606         {
607             super.messageComplete();
608             return false;
609         }
610     }
611
612     private class SendCallback extends IteratingCallback
613     {
614         private ResponseInfo _info;
615         private ByteBuffer _content;
616         private boolean _lastContent;
617         private Callback _callback;
618         private ByteBuffer _header;
619         private boolean _shutdownOut;
620
621         private SendCallback()
622         {
623             super(true);
624         }
625
626         private boolean reset(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
627         {
628             if (reset())
629             {
630                 _info = info;
631                 _content = content;
632                 _lastContent = last;
633                 _callback = callback;
634                 _header = null;
635                 _shutdownOut = false;
636                 return true;
637             }
638
639             if (isClosed())
640                 callback.failed(new EofException());
641             else
642                 callback.failed(new WritePendingException());
643             return false;
644         }
645
646         @Override
647         public Action process() throws Exception
648         {
649             if (_callback==null)
650                 throw new IllegalStateException();
651
652             ByteBuffer chunk = _chunk;
653             while (true)
654             {
655                 HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent);
656                 if (LOG.isDebugEnabled())
657                     LOG.debug("{} generate: {} ({},{},{})@{}",
658                         this,
659                         result,
660                         BufferUtil.toSummaryString(_header),
661                         BufferUtil.toSummaryString(_content),
662                         _lastContent,
663                         _generator.getState());
664
665                 switch (result)
666                 {
667                     case NEED_HEADER:
668                     {
669                         _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
670                         continue;
671                     }
672                     case NEED_CHUNK:
673                     {
674                         chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
675                         continue;
676                     }
677                     case FLUSH:
678                     {
679                         // Don't write the chunk or the content if this is a HEAD response, or any other type of response that should have no content
680                         if (_channel.getRequest().isHead() || _generator.isNoContent())
681                         {
682                             BufferUtil.clear(chunk);
683                             BufferUtil.clear(_content);
684                         }
685
686                         // If we have a header
687                         if (BufferUtil.hasContent(_header))
688                         {
689                             if (BufferUtil.hasContent(_content))
690                             {
691                                 if (BufferUtil.hasContent(chunk))
692                                     getEndPoint().write(this, _header, chunk, _content);
693                                 else
694                                     getEndPoint().write(this, _header, _content);
695                             }
696                             else
697                                 getEndPoint().write(this, _header);
698                         }
699                         else if (BufferUtil.hasContent(chunk))
700                         {
701                             if (BufferUtil.hasContent(_content))
702                                 getEndPoint().write(this, chunk, _content);
703                             else
704                                 getEndPoint().write(this, chunk);
705                         }
706                         else if (BufferUtil.hasContent(_content))
707                         {
708                             getEndPoint().write(this, _content);
709                         }
710                         else
711                         {
712                             succeeded(); // nothing to write
713                         }
714                         return Action.SCHEDULED;
715                     }
716                     case SHUTDOWN_OUT:
717                     {
718                         _shutdownOut=true;
719                         continue;
720                     }
721                     case DONE:
722                     {
723                         return Action.SUCCEEDED;
724                     }
725                     case CONTINUE:
726                     {
727                         break;
728                     }
729                     default:
730                     {
731                         throw new IllegalStateException("generateResponse="+result);
732                     }
733                 }
734             }
735         }
736
737         private void releaseHeader()
738         {
739             ByteBuffer h=_header;
740             _header=null;
741             if (h!=null)
742                 _bufferPool.release(h);
743         }
744
745         @Override
746         protected void onCompleteSuccess()
747         {
748             releaseHeader();
749             _callback.succeeded();
750             if (_shutdownOut)
751                 getEndPoint().shutdownOutput();
752         }
753
754         @Override
755         public void onCompleteFailure(final Throwable x)
756         {
757             releaseHeader();
758             failedCallback(_callback,x);
759             if (_shutdownOut)
760                 getEndPoint().shutdownOutput();
761         }
762
763         @Override
764         public String toString()
765         {
766             return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
767         }
768     }
769 }