2 // ========================================================================
3 // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.server;
21 import java.io.IOException;
22 import java.util.Objects;
24 import javax.servlet.ReadListener;
25 import javax.servlet.ServletInputStream;
27 import org.eclipse.jetty.io.EofException;
28 import org.eclipse.jetty.io.RuntimeIOException;
29 import org.eclipse.jetty.util.log.Log;
30 import org.eclipse.jetty.util.log.Logger;
33 * {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
35 * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
36 * maintains two states: the content state that tells whether there is content to consume and the EOF
37 * state that tells whether an EOF has arrived.
38 * Only once the content has been consumed the content state is moved to the EOF state.
40 public abstract class HttpInput<T> extends ServletInputStream implements Runnable
42 private final static Logger LOG = Log.getLogger(HttpInput.class);
44 private final byte[] _oneByteBuffer = new byte[1];
45 private final Object _lock;
46 private HttpChannelState _channelState;
47 private ReadListener _listener;
48 private Throwable _onError;
49 private boolean _notReady;
50 private State _contentState = STREAM;
51 private State _eofState;
52 private long _contentRead;
59 protected HttpInput(Object lock)
61 _lock = lock == null ? this : lock;
64 public void init(HttpChannelState state)
68 _channelState = state;
72 public final Object lock()
84 _contentState = STREAM;
91 public int available()
97 T item = getNextContent();
98 return item == null ? 0 : remaining(item);
101 catch (IOException e)
103 throw new RuntimeIOException(e);
108 public int read() throws IOException
110 int read = read(_oneByteBuffer, 0, 1);
111 return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
115 public int read(byte[] b, int off, int len) throws IOException
117 synchronized (lock())
119 T item = getNextContent();
122 _contentState.waitForContent(this);
123 item = getNextContent();
125 return _contentState.noContent();
127 int l = get(item, b, off, len);
134 * A convenience method to call nextContent and to check the return value, which if null then the
135 * a check is made for EOF and the state changed accordingly.
137 * @return Content or null if none available.
138 * @throws IOException
139 * @see #nextContent()
141 protected T getNextContent() throws IOException
143 T content = nextContent();
146 synchronized (lock())
148 if (_eofState != null)
150 if (LOG.isDebugEnabled())
151 LOG.debug("{} eof {}", this, _eofState);
152 _contentState = _eofState;
160 * Access the next content to be consumed from. Returning the next item does not consume it
161 * and it may be returned multiple times until it is consumed.
163 * Calls to {@link #get(Object, byte[], int, int)}
164 * or {@link #consume(Object, int)} are required to consume data from the content.
166 * @return the content or null if none available.
167 * @throws IOException if retrieving the content fails
169 protected abstract T nextContent() throws IOException;
172 * @param item the content
173 * @return how many bytes remain in the given content
175 protected abstract int remaining(T item);
178 * Copies the given content into the given byte buffer.
180 * @param item the content to copy from
181 * @param buffer the buffer to copy into
182 * @param offset the buffer offset to start copying from
183 * @param length the space available in the buffer
184 * @return the number of bytes actually copied
186 protected abstract int get(T item, byte[] buffer, int offset, int length);
189 * Consumes the given content.
191 * @param item the content to consume
192 * @param length the number of bytes to consume
194 protected abstract void consume(T item, int length);
197 * Blocks until some content or some end-of-file event arrives.
199 * @throws IOException if the wait is interrupted
201 protected abstract void blockForContent() throws IOException;
204 * Adds some content to this input stream.
206 * @param item the content to add
208 public abstract void content(T item);
210 protected boolean onAsyncRead()
212 synchronized (lock())
214 if (_listener == null)
217 _channelState.onReadPossible();
221 public long getContentRead()
223 synchronized (lock())
230 * This method should be called to signal that an EOF has been
231 * detected before all the expected content arrived.
233 * Typically this will result in an EOFException being thrown
234 * from a subsequent read rather than a -1 return.
236 public void earlyEOF()
238 synchronized (lock())
242 if (LOG.isDebugEnabled())
243 LOG.debug("{} early EOF", this);
244 _eofState = EARLY_EOF;
245 if (_listener == null)
249 _channelState.onReadPossible();
253 public boolean isEarlyEOF()
255 synchronized (lock())
257 return _contentState==EARLY_EOF;
262 * This method should be called to signal that all the expected
265 public void messageComplete()
267 synchronized (lock())
271 if (LOG.isDebugEnabled())
272 LOG.debug("{} EOF", this);
274 if (_listener == null)
278 _channelState.onReadPossible();
281 public boolean consumeAll()
283 synchronized (lock())
285 // Don't bother reading if we already know there was an error.
286 if (_onError != null)
291 while (!isFinished())
293 T item = getNextContent();
295 _contentState.waitForContent(this);
297 consume(item, remaining(item));
301 catch (IOException e)
309 public boolean isAsync()
311 synchronized (lock())
313 return _contentState==ASYNC;
318 * @return whether an EOF has been detected, even though there may be content to consume.
320 public boolean isEOF()
322 synchronized (lock())
324 return _eofState != null && _eofState.isEOF();
329 public boolean isFinished()
331 synchronized (lock())
333 return _contentState.isEOF();
339 public boolean isReady()
342 synchronized (lock())
344 if (_contentState.isEOF())
346 if (_listener == null )
353 finished = isFinished();
356 _channelState.onReadPossible();
362 protected void unready()
367 public void setReadListener(ReadListener readListener)
371 readListener = Objects.requireNonNull(readListener);
373 synchronized (lock())
375 if (_contentState != STREAM)
376 throw new IllegalStateException("state=" + _contentState);
377 _contentState = ASYNC;
378 _listener = readListener;
381 content = getNextContent()!=null || isEOF();
385 _channelState.onReadPossible();
391 throw new RuntimeIOException(e);
395 public void failed(Throwable x)
397 synchronized (lock())
399 if (_onError != null)
409 final Throwable error;
410 final ReadListener listener;
411 boolean available = false;
414 synchronized (lock())
416 if (!_notReady || _listener == null)
420 listener = _listener;
424 T item = getNextContent();
425 available = item != null && remaining(item) > 0;
432 eof = !available && isFinished();
433 _notReady = !available && !eof;
439 listener.onError(error);
441 listener.onDataAvailable();
443 listener.onAllDataRead();
449 LOG.warn(e.toString());
456 public String toString()
458 return String.format("%s@%x[r=%d,s=%s,e=%s,f=%s]",
459 getClass().getSimpleName(),
467 protected static abstract class State
469 public void waitForContent(HttpInput<?> in) throws IOException
473 public int noContent() throws IOException
478 public boolean isEOF()
484 protected static final State STREAM = new State()
487 public void waitForContent(HttpInput<?> input) throws IOException
489 input.blockForContent();
493 public String toString()
499 protected static final State ASYNC = new State()
502 public int noContent() throws IOException
508 public String toString()
514 protected static final State EARLY_EOF = new State()
517 public int noContent() throws IOException
519 throw new EofException("Early EOF");
523 public boolean isEOF()
529 public String toString()
535 protected static final State EOF = new State()
538 public boolean isEOF()
544 public String toString()