]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/server/HttpInput.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / server / HttpInput.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.util.Objects;
23 import javax.servlet.ReadListener;
24 import javax.servlet.ServletInputStream;
25
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.RuntimeIOException;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30
31 /**
32  * {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
33  * <p/>
34  * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
35  * maintains two states: the content state that tells whether there is content to consume and the EOF
36  * state that tells whether an EOF has arrived.
37  * Only once the content has been consumed the content state is moved to the EOF state.
38  */
39 public abstract class HttpInput<T> extends ServletInputStream implements Runnable
40 {
41     private final static Logger LOG = Log.getLogger(HttpInput.class);
42
43     private final byte[] _oneByteBuffer = new byte[1];
44     private final Object _lock;
45     private HttpChannelState _channelState;
46     private ReadListener _listener;
47     private Throwable _onError;
48     private boolean _notReady;
49     private State _contentState = STREAM;
50     private State _eofState;
51     private long _contentRead;
52
53     protected HttpInput()
54     {
55         this(null);
56     }
57
58     protected HttpInput(Object lock)
59     {
60         _lock = lock == null ? this : lock;
61     }
62
63     public void init(HttpChannelState state)
64     {
65         synchronized (lock())
66         {
67             _channelState = state;
68         }
69     }
70
71     public final Object lock()
72     {
73         return _lock;
74     }
75
76     public void recycle()
77     {
78         synchronized (lock())
79         {
80             _listener = null;
81             _onError = null;
82             _notReady = false;
83             _contentState = STREAM;
84             _eofState = null;
85             _contentRead = 0;
86         }
87     }
88
89     @Override
90     public int available()
91     {
92         try
93         {
94             synchronized (lock())
95             {
96                 T item = getNextContent();
97                 return item == null ? 0 : remaining(item);
98             }
99         }
100         catch (IOException e)
101         {
102             throw new RuntimeIOException(e);
103         }
104     }
105
106     @Override
107     public int read() throws IOException
108     {
109         int read = read(_oneByteBuffer, 0, 1);
110         return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
111     }
112
113     @Override
114     public int read(byte[] b, int off, int len) throws IOException
115     {
116         synchronized (lock())
117         {
118             T item = getNextContent();
119             if (item == null)
120             {
121                 _contentState.waitForContent(this);
122                 item = getNextContent();
123                 if (item == null)
124                     return _contentState.noContent();
125             }
126             int l = get(item, b, off, len);
127             _contentRead += l;
128             return l;
129         }
130     }
131
132     /**
133      * A convenience method to call nextContent and to check the return value, which if null then the
134      * a check is made for EOF and the state changed accordingly.
135      *
136      * @return Content or null if none available.
137      * @throws IOException
138      * @see #nextContent()
139      */
140     protected T getNextContent() throws IOException
141     {
142         T content = nextContent();
143         if (content == null)
144         {
145             synchronized (lock())
146             {
147                 if (_eofState != null)
148                 {
149                     LOG.debug("{} eof {}", this, _eofState);
150                     _contentState = _eofState;
151                 }
152             }
153         }
154         return content;
155     }
156
157     /**
158      * Access the next content to be consumed from.   Returning the next item does not consume it
159      * and it may be returned multiple times until it is consumed.
160      * <p/>
161      * Calls to {@link #get(Object, byte[], int, int)}
162      * or {@link #consume(Object, int)} are required to consume data from the content.
163      *
164      * @return the content or null if none available.
165      * @throws IOException if retrieving the content fails
166      */
167     protected abstract T nextContent() throws IOException;
168
169     /**
170      * @param item the content
171      * @return how many bytes remain in the given content
172      */
173     protected abstract int remaining(T item);
174
175     /**
176      * Copies the given content into the given byte buffer.
177      *
178      * @param item   the content to copy from
179      * @param buffer the buffer to copy into
180      * @param offset the buffer offset to start copying from
181      * @param length the space available in the buffer
182      * @return the number of bytes actually copied
183      */
184     protected abstract int get(T item, byte[] buffer, int offset, int length);
185
186     /**
187      * Consumes the given content.
188      *
189      * @param item   the content to consume
190      * @param length the number of bytes to consume
191      */
192     protected abstract void consume(T item, int length);
193
194     /**
195      * Blocks until some content or some end-of-file event arrives.
196      *
197      * @throws IOException if the wait is interrupted
198      */
199     protected abstract void blockForContent() throws IOException;
200
201     /**
202      * Adds some content to this input stream.
203      *
204      * @param item the content to add
205      */
206     public abstract void content(T item);
207
208     protected boolean onAsyncRead()
209     {
210         synchronized (lock())
211         {
212             if (_listener == null)
213                 return false;
214         }
215         _channelState.onReadPossible();
216         return true;
217     }
218
219     public long getContentRead()
220     {
221         synchronized (lock())
222         {
223             return _contentRead;
224         }
225     }
226
227     /**
228      * This method should be called to signal that an EOF has been
229      * detected before all the expected content arrived.
230      * <p/>
231      * Typically this will result in an EOFException being thrown
232      * from a subsequent read rather than a -1 return.
233      */
234     public void earlyEOF()
235     {
236         synchronized (lock())
237         {
238             if (!isEOF())
239             {
240                 LOG.debug("{} early EOF", this);
241                 _eofState = EARLY_EOF;
242                 if (_listener == null)
243                     return;
244             }
245         }
246         _channelState.onReadPossible();
247     }
248
249     /**
250      * This method should be called to signal that all the expected
251      * content arrived.
252      */
253     public void messageComplete()
254     {
255         synchronized (lock())
256         {
257             if (!isEOF())
258             {
259                 LOG.debug("{} EOF", this);
260                 _eofState = EOF;
261                 if (_listener == null)
262                     return;
263             }
264         }
265         _channelState.onReadPossible();
266     }
267
268     public void consumeAll()
269     {
270         synchronized (lock())
271         {
272             try
273             {
274                 while (!isFinished())
275                 {
276                     T item = getNextContent();
277                     if (item == null)
278                         _contentState.waitForContent(this);
279                     else
280                         consume(item, remaining(item));
281                 }
282             }
283             catch (IOException e)
284             {
285                 LOG.debug(e);
286             }
287         }
288     }
289
290     public boolean isAsync()
291     {
292         synchronized (lock())
293         {
294             return _contentState==ASYNC;
295         }
296     }
297     
298     /**
299      * @return whether an EOF has been detected, even though there may be content to consume.
300      */
301     public boolean isEOF()
302     {
303         synchronized (lock())
304         {
305             return _eofState != null && _eofState.isEOF();
306         }
307     }
308
309     @Override
310     public boolean isFinished()
311     {
312         synchronized (lock())
313         {
314             return _contentState.isEOF();
315         }
316     }
317
318     @Override
319     public boolean isReady()
320     {
321         boolean finished;
322         synchronized (lock())
323         {
324             if (_contentState.isEOF())
325                 return true;
326             if (_listener == null )
327                 return true;
328             if (available() > 0)
329                 return true;
330             if (_notReady)
331                 return false;
332             _notReady = true;
333             finished = isFinished();
334         }
335         if (finished)
336             _channelState.onReadPossible();
337         else
338             unready();
339         return false;
340     }
341
342     protected void unready()
343     {
344     }
345
346     @Override
347     public void setReadListener(ReadListener readListener)
348     {
349         readListener = Objects.requireNonNull(readListener);
350         synchronized (lock())
351         {
352             if (_contentState != STREAM)
353                 throw new IllegalStateException("state=" + _contentState);
354             _contentState = ASYNC;
355             _listener = readListener;
356             _notReady = true;
357         }
358         _channelState.onReadPossible();
359     }
360
361     public void failed(Throwable x)
362     {
363         synchronized (lock())
364         {
365             if (_onError != null)
366                 LOG.warn(x);
367             else
368                 _onError = x;
369         }
370     }
371
372     @Override
373     public void run()
374     {
375         final Throwable error;
376         final ReadListener listener;
377         boolean available = false;
378         final boolean eof;
379
380         synchronized (lock())
381         {
382             if (!_notReady || _listener == null)
383                 return;
384
385             error = _onError;
386             listener = _listener;
387
388             try
389             {
390                 T item = getNextContent();
391                 available = item != null && remaining(item) > 0;
392             }
393             catch (Exception e)
394             {
395                 failed(e);
396             }
397
398             eof = !available && isFinished();
399             _notReady = !available && !eof;
400         }
401
402         try
403         {
404             if (error != null)
405                 listener.onError(error);
406             else if (available)
407                 listener.onDataAvailable();
408             else if (eof)
409                 listener.onAllDataRead();
410             else
411                 unready();
412         }
413         catch (Throwable e)
414         {
415             LOG.warn(e.toString());
416             LOG.debug(e);
417             listener.onError(e);
418         }
419     }
420
421     protected static abstract class State
422     {
423         public void waitForContent(HttpInput<?> in) throws IOException
424         {
425         }
426
427         public int noContent() throws IOException
428         {
429             return -1;
430         }
431
432         public boolean isEOF()
433         {
434             return false;
435         }
436     }
437
438     protected static final State STREAM = new State()
439     {
440         @Override
441         public void waitForContent(HttpInput<?> input) throws IOException
442         {
443             input.blockForContent();
444         }
445
446         @Override
447         public String toString()
448         {
449             return "STREAM";
450         }
451     };
452
453     protected static final State ASYNC = new State()
454     {
455         @Override
456         public int noContent() throws IOException
457         {
458             return 0;
459         }
460
461         @Override
462         public String toString()
463         {
464             return "ASYNC";
465         }
466     };
467
468     protected static final State EARLY_EOF = new State()
469     {
470         @Override
471         public int noContent() throws IOException
472         {
473             throw new EofException();
474         }
475
476         @Override
477         public boolean isEOF()
478         {
479             return true;
480         }
481
482         @Override
483         public String toString()
484         {
485             return "EARLY_EOF";
486         }
487     };
488
489     protected static final State EOF = new State()
490     {
491         @Override
492         public boolean isEOF()
493         {
494             return true;
495         }
496
497         @Override
498         public String toString()
499         {
500             return "EOF";
501         }
502     };
503 }