2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.server;
21 import java.io.IOException;
22 import java.util.Objects;
23 import javax.servlet.ReadListener;
24 import javax.servlet.ServletInputStream;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.RuntimeIOException;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
32 * {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.
34 * Content may arrive in patterns such as [content(), content(), messageComplete()] so that this class
35 * maintains two states: the content state that tells whether there is content to consume and the EOF
36 * state that tells whether an EOF has arrived.
37 * Only once the content has been consumed the content state is moved to the EOF state.
39 public abstract class HttpInput<T> extends ServletInputStream implements Runnable
41 private final static Logger LOG = Log.getLogger(HttpInput.class);
43 private final byte[] _oneByteBuffer = new byte[1];
44 private final Object _lock;
45 private HttpChannelState _channelState;
46 private ReadListener _listener;
47 private Throwable _onError;
48 private boolean _notReady;
49 private State _contentState = STREAM;
50 private State _eofState;
51 private long _contentRead;
58 protected HttpInput(Object lock)
60 _lock = lock == null ? this : lock;
63 public void init(HttpChannelState state)
67 _channelState = state;
71 public final Object lock()
83 _contentState = STREAM;
90 public int available()
96 T item = getNextContent();
97 return item == null ? 0 : remaining(item);
100 catch (IOException e)
102 throw new RuntimeIOException(e);
107 public int read() throws IOException
109 int read = read(_oneByteBuffer, 0, 1);
110 return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF;
114 public int read(byte[] b, int off, int len) throws IOException
116 synchronized (lock())
118 T item = getNextContent();
121 _contentState.waitForContent(this);
122 item = getNextContent();
124 return _contentState.noContent();
126 int l = get(item, b, off, len);
133 * A convenience method to call nextContent and to check the return value, which if null then the
134 * a check is made for EOF and the state changed accordingly.
136 * @return Content or null if none available.
137 * @throws IOException
138 * @see #nextContent()
140 protected T getNextContent() throws IOException
142 T content = nextContent();
145 synchronized (lock())
147 if (_eofState != null)
149 LOG.debug("{} eof {}", this, _eofState);
150 _contentState = _eofState;
158 * Access the next content to be consumed from. Returning the next item does not consume it
159 * and it may be returned multiple times until it is consumed.
161 * Calls to {@link #get(Object, byte[], int, int)}
162 * or {@link #consume(Object, int)} are required to consume data from the content.
164 * @return the content or null if none available.
165 * @throws IOException if retrieving the content fails
167 protected abstract T nextContent() throws IOException;
170 * @param item the content
171 * @return how many bytes remain in the given content
173 protected abstract int remaining(T item);
176 * Copies the given content into the given byte buffer.
178 * @param item the content to copy from
179 * @param buffer the buffer to copy into
180 * @param offset the buffer offset to start copying from
181 * @param length the space available in the buffer
182 * @return the number of bytes actually copied
184 protected abstract int get(T item, byte[] buffer, int offset, int length);
187 * Consumes the given content.
189 * @param item the content to consume
190 * @param length the number of bytes to consume
192 protected abstract void consume(T item, int length);
195 * Blocks until some content or some end-of-file event arrives.
197 * @throws IOException if the wait is interrupted
199 protected abstract void blockForContent() throws IOException;
202 * Adds some content to this input stream.
204 * @param item the content to add
206 public abstract void content(T item);
208 protected boolean onAsyncRead()
210 synchronized (lock())
212 if (_listener == null)
215 _channelState.onReadPossible();
219 public long getContentRead()
221 synchronized (lock())
228 * This method should be called to signal that an EOF has been
229 * detected before all the expected content arrived.
231 * Typically this will result in an EOFException being thrown
232 * from a subsequent read rather than a -1 return.
234 public void earlyEOF()
236 synchronized (lock())
240 LOG.debug("{} early EOF", this);
241 _eofState = EARLY_EOF;
242 if (_listener == null)
246 _channelState.onReadPossible();
250 * This method should be called to signal that all the expected
253 public void messageComplete()
255 synchronized (lock())
259 LOG.debug("{} EOF", this);
261 if (_listener == null)
265 _channelState.onReadPossible();
268 public void consumeAll()
270 synchronized (lock())
274 while (!isFinished())
276 T item = getNextContent();
278 _contentState.waitForContent(this);
280 consume(item, remaining(item));
283 catch (IOException e)
290 public boolean isAsync()
292 synchronized (lock())
294 return _contentState==ASYNC;
299 * @return whether an EOF has been detected, even though there may be content to consume.
301 public boolean isEOF()
303 synchronized (lock())
305 return _eofState != null && _eofState.isEOF();
310 public boolean isFinished()
312 synchronized (lock())
314 return _contentState.isEOF();
319 public boolean isReady()
322 synchronized (lock())
324 if (_contentState.isEOF())
326 if (_listener == null )
333 finished = isFinished();
336 _channelState.onReadPossible();
342 protected void unready()
347 public void setReadListener(ReadListener readListener)
349 readListener = Objects.requireNonNull(readListener);
350 synchronized (lock())
352 if (_contentState != STREAM)
353 throw new IllegalStateException("state=" + _contentState);
354 _contentState = ASYNC;
355 _listener = readListener;
358 _channelState.onReadPossible();
361 public void failed(Throwable x)
363 synchronized (lock())
365 if (_onError != null)
375 final Throwable error;
376 final ReadListener listener;
377 boolean available = false;
380 synchronized (lock())
382 if (!_notReady || _listener == null)
386 listener = _listener;
390 T item = getNextContent();
391 available = item != null && remaining(item) > 0;
398 eof = !available && isFinished();
399 _notReady = !available && !eof;
405 listener.onError(error);
407 listener.onDataAvailable();
409 listener.onAllDataRead();
415 LOG.warn(e.toString());
421 protected static abstract class State
423 public void waitForContent(HttpInput<?> in) throws IOException
427 public int noContent() throws IOException
432 public boolean isEOF()
438 protected static final State STREAM = new State()
441 public void waitForContent(HttpInput<?> input) throws IOException
443 input.blockForContent();
447 public String toString()
453 protected static final State ASYNC = new State()
456 public int noContent() throws IOException
462 public String toString()
468 protected static final State EARLY_EOF = new State()
471 public int noContent() throws IOException
473 throw new EofException();
477 public boolean isEOF()
483 public String toString()
489 protected static final State EOF = new State()
492 public boolean isEOF()
498 public String toString()