2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.server;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.ReadableByteChannel;
25 import java.nio.channels.WritePendingException;
26 import java.util.concurrent.atomic.AtomicReference;
27 import javax.servlet.RequestDispatcher;
28 import javax.servlet.ServletOutputStream;
29 import javax.servlet.ServletRequest;
30 import javax.servlet.ServletResponse;
31 import javax.servlet.WriteListener;
33 import org.eclipse.jetty.http.HttpContent;
34 import org.eclipse.jetty.io.EofException;
35 import org.eclipse.jetty.util.BufferUtil;
36 import org.eclipse.jetty.util.Callback;
37 import org.eclipse.jetty.util.IteratingCallback;
38 import org.eclipse.jetty.util.IteratingNestedCallback;
39 import org.eclipse.jetty.util.SharedBlockingCallback;
40 import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
45 * <p>{@link HttpOutput} implements {@link ServletOutputStream}
46 * as required by the Servlet specification.</p>
47 * <p>{@link HttpOutput} buffers content written by the application until a
48 * further write will overflow the buffer, at which point it triggers a commit
49 * of the response.</p>
50 * <p>{@link HttpOutput} can be closed and reopened, to allow requests included
51 * via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
52 * close the stream, to be reopened after the inclusion ends.</p>
54 public class HttpOutput extends ServletOutputStream implements Runnable
56 private static Logger LOG = Log.getLogger(HttpOutput.class);
57 private final HttpChannel<?> _channel;
58 private final SharedBlockingCallback _writeblock=new SharedBlockingCallback();
59 private long _written;
60 private ByteBuffer _aggregate;
61 private int _bufferSize;
62 private int _commitSize;
63 private WriteListener _writeListener;
64 private volatile Throwable _onError;
67 ACTION OPEN ASYNC READY PENDING UNREADY CLOSED
68 -----------------------------------------------------------------------------------------------------
69 setWriteListener() READY->owp ise ise ise ise ise
70 write() OPEN ise PENDING wpe wpe eof
71 flush() OPEN ise PENDING wpe wpe eof
72 close() CLOSED CLOSED CLOSED CLOSED wpe CLOSED
73 isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
74 write completed - - - ASYNC READY->owp -
77 enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
78 private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
80 public HttpOutput(HttpChannel<?> channel)
83 _bufferSize = _channel.getHttpConfiguration().getOutputBufferSize();
84 _commitSize=_bufferSize/4;
87 public HttpChannel<?> getHttpChannel()
92 public boolean isWritten()
97 public long getWritten()
110 _state.set(OutputState.OPEN);
113 public boolean isAllContentWritten()
115 return _channel.getResponse().isAllContentWritten(_written);
118 protected Blocker acquireWriteBlockingCallback() throws IOException
120 return _writeblock.acquire();
123 protected void write(ByteBuffer content, boolean complete) throws IOException
125 try (Blocker blocker=_writeblock.acquire())
127 write(content,complete,blocker);
132 protected void write(ByteBuffer content, boolean complete, Callback callback)
134 _channel.write(content,complete,callback);
142 OutputState state=_state.get();
149 if (_state.compareAndSet(state,OutputState.ERROR))
150 _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
154 if (_state.compareAndSet(state,OutputState.CLOSED))
158 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding());
172 /* Called to indicated that the output is already closed (write with last==true performed) and the state needs to be updated to match */
177 OutputState state=_state.get();
184 if (_state.compareAndSet(state,OutputState.ERROR))
185 _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
189 if (_state.compareAndSet(state,OutputState.CLOSED))
193 _channel.getResponse().closeOutput();
207 private void releaseBuffer()
209 if (_aggregate != null)
211 _channel.getConnector().getByteBufferPool().release(_aggregate);
216 public boolean isClosed()
218 return _state.get()==OutputState.CLOSED;
222 public void flush() throws IOException
229 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
233 throw new IllegalStateException("isReady() not called");
236 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
238 new AsyncFlush().iterate();
243 throw new WritePendingException();
246 throw new EofException(_onError);
257 public void write(byte[] b, int off, int len) throws IOException
260 boolean complete=_channel.getResponse().isAllContentWritten(_written);
262 // Async or Blocking ?
268 // process blocking below
272 throw new IllegalStateException("isReady() not called");
275 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
278 // Should we aggregate?
279 if (!complete && len<=_commitSize)
281 if (_aggregate == null)
282 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
284 // YES - fill the aggregate with content from the buffer
285 int filled = BufferUtil.fill(_aggregate, b, off, len);
287 // return if we are not complete, not full and filled all the content
288 if (filled==len && !BufferUtil.isFull(_aggregate))
290 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
291 throw new IllegalStateException();
295 // adjust offset/length
300 // Do the asynchronous writing from the callback
301 new AsyncWrite(b,off,len,complete).iterate();
306 throw new WritePendingException();
309 throw new EofException(_onError);
312 throw new EofException("Closed");
318 // handle blocking write
320 // Should we aggregate?
321 int capacity = getBufferSize();
322 if (!complete && len<=_commitSize)
324 if (_aggregate == null)
325 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
327 // YES - fill the aggregate with content from the buffer
328 int filled = BufferUtil.fill(_aggregate, b, off, len);
330 // return if we are not complete, not full and filled all the content
331 if (filled==len && !BufferUtil.isFull(_aggregate))
334 // adjust offset/length
339 // flush any content from the aggregate
340 if (BufferUtil.hasContent(_aggregate))
342 write(_aggregate, complete && len==0);
344 // should we fill aggregate again from the buffer?
345 if (len>0 && !complete && len<=_commitSize)
347 BufferUtil.append(_aggregate, b, off, len);
352 // write any remaining content in the buffer directly
355 ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
356 ByteBuffer view = wrap.duplicate();
358 // write a buffer capacity at a time to avoid JVM pooling large direct buffers
359 // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
360 while (len>getBufferSize())
362 int p=view.position();
363 int l=p+getBufferSize();
364 view.limit(p+getBufferSize());
366 len-=getBufferSize();
367 view.limit(l+Math.min(len,getBufferSize()));
370 write(view,complete);
373 write(BufferUtil.EMPTY_BUFFER,complete);
380 public void write(ByteBuffer buffer) throws IOException
382 _written+=buffer.remaining();
383 boolean complete=_channel.getResponse().isAllContentWritten(_written);
385 // Async or Blocking ?
391 // process blocking below
395 throw new IllegalStateException("isReady() not called");
398 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
401 // Do the asynchronous writing from the callback
402 new AsyncWrite(buffer,complete).iterate();
407 throw new WritePendingException();
410 throw new EofException(_onError);
413 throw new EofException("Closed");
419 // handle blocking write
420 int len=BufferUtil.length(buffer);
422 // flush any content from the aggregate
423 if (BufferUtil.hasContent(_aggregate))
424 write(_aggregate, complete && len==0);
426 // write any remaining content in the buffer directly
428 write(buffer, complete);
430 write(BufferUtil.EMPTY_BUFFER,complete);
437 public void write(int b) throws IOException
440 boolean complete=_channel.getResponse().isAllContentWritten(_written);
442 // Async or Blocking ?
448 if (_aggregate == null)
449 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
450 BufferUtil.append(_aggregate, (byte)b);
452 // Check if all written or full
453 if (complete || BufferUtil.isFull(_aggregate))
455 try(Blocker blocker=_writeblock.acquire())
457 write(_aggregate, complete, blocker);
466 throw new IllegalStateException("isReady() not called");
469 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
472 if (_aggregate == null)
473 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
474 BufferUtil.append(_aggregate, (byte)b);
476 // Check if all written or full
477 if (!complete && !BufferUtil.isFull(_aggregate))
479 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
480 throw new IllegalStateException();
484 // Do the asynchronous writing from the callback
485 new AsyncFlush().iterate();
490 throw new WritePendingException();
493 throw new EofException(_onError);
496 throw new EofException("Closed");
503 public void print(String s) throws IOException
506 throw new IOException("Closed");
508 write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
511 /* ------------------------------------------------------------ */
512 /** Blocking send of content.
513 * @param content The content to send.
514 * @throws IOException
516 public void sendContent(ByteBuffer content) throws IOException
518 try(Blocker blocker=_writeblock.acquire())
520 write(content,true,blocker);
525 /* ------------------------------------------------------------ */
526 /** Blocking send of content.
527 * @param in The content to send
528 * @throws IOException
530 public void sendContent(InputStream in) throws IOException
532 try(Blocker blocker=_writeblock.acquire())
534 new InputStreamWritingCB(in,blocker).iterate();
539 /* ------------------------------------------------------------ */
540 /** Blocking send of content.
541 * @param in The content to send
542 * @throws IOException
544 public void sendContent(ReadableByteChannel in) throws IOException
546 try(Blocker blocker=_writeblock.acquire())
548 new ReadableByteChannelWritingCB(in,blocker).iterate();
554 /* ------------------------------------------------------------ */
555 /** Blocking send of content.
556 * @param content The content to send
557 * @throws IOException
559 public void sendContent(HttpContent content) throws IOException
561 try(Blocker blocker=_writeblock.acquire())
563 sendContent(content,blocker);
568 /* ------------------------------------------------------------ */
569 /** Asynchronous send of content.
570 * @param content The content to send
571 * @param callback The callback to use to notify success or failure
573 public void sendContent(ByteBuffer content, final Callback callback)
575 write(content,true,new Callback()
578 public void succeeded()
581 callback.succeeded();
585 public void failed(Throwable x)
592 /* ------------------------------------------------------------ */
593 /** Asynchronous send of content.
594 * @param in The content to send as a stream. The stream will be closed
595 * after reading all content.
596 * @param callback The callback to use to notify success or failure
598 public void sendContent(InputStream in, Callback callback)
600 new InputStreamWritingCB(in,callback).iterate();
603 /* ------------------------------------------------------------ */
604 /** Asynchronous send of content.
605 * @param in The content to send as a channel. The channel will be closed
606 * after reading all content.
607 * @param callback The callback to use to notify success or failure
609 public void sendContent(ReadableByteChannel in, Callback callback)
611 new ReadableByteChannelWritingCB(in,callback).iterate();
614 /* ------------------------------------------------------------ */
615 /** Asynchronous send of content.
616 * @param httpContent The content to send
617 * @param callback The callback to use to notify success or failure
619 public void sendContent(HttpContent httpContent, Callback callback) throws IOException
621 if (BufferUtil.hasContent(_aggregate))
622 throw new IOException("written");
623 if (_channel.isCommitted())
624 throw new IOException("committed");
631 if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
635 throw new EofException(_onError);
637 throw new EofException("Closed");
639 throw new IllegalStateException();
643 ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
645 buffer = httpContent.getIndirectBuffer();
649 sendContent(buffer,callback);
653 ReadableByteChannel rbc=httpContent.getReadableByteChannel();
656 // Close of the rbc is done by the async sendContent
657 sendContent(rbc,callback);
661 InputStream in = httpContent.getInputStream();
664 sendContent(in,callback);
668 callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
671 public int getBufferSize()
676 public void setBufferSize(int size)
682 public void resetBuffer()
684 if (BufferUtil.hasContent(_aggregate))
685 BufferUtil.clear(_aggregate);
689 public void setWriteListener(WriteListener writeListener)
691 if (!_channel.getState().isAsync())
692 throw new IllegalStateException("!ASYNC");
694 if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
696 _writeListener = writeListener;
697 _channel.getState().onWritePossible();
700 throw new IllegalStateException();
704 * @see javax.servlet.ServletOutputStream#isReady()
707 public boolean isReady()
716 if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
722 if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
742 OutputState state = _state.get();
754 if (_state.compareAndSet(state, OutputState.ERROR))
756 Throwable th=_onError;
758 LOG.debug("onError",th);
759 _writeListener.onError(th);
773 // even though a write is not possible, because a close has
774 // occurred, we need to call onWritePossible to tell async
775 // producer that the last write completed.
778 _writeListener.onWritePossible();
793 public String toString()
795 return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
798 private abstract class AsyncICB extends IteratingCallback
801 protected void completed()
805 OutputState last=_state.get();
809 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
814 if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
816 _channel.getState().onWritePossible();
823 throw new IllegalStateException();
830 public void failed(Throwable e)
834 _channel.getState().onWritePossible();
839 private class AsyncFlush extends AsyncICB
841 protected volatile boolean _flushed;
848 protected Action process()
850 if (BufferUtil.hasContent(_aggregate))
853 write(_aggregate, false, this);
854 return Action.SCHEDULED;
860 write(BufferUtil.EMPTY_BUFFER,false,this);
861 return Action.SCHEDULED;
864 return Action.SUCCEEDED;
870 private class AsyncWrite extends AsyncICB
872 private final ByteBuffer _buffer;
873 private final ByteBuffer _slice;
874 private final boolean _complete;
875 private final int _len;
876 protected volatile boolean _completed;
878 public AsyncWrite(byte[] b, int off, int len, boolean complete)
880 _buffer=ByteBuffer.wrap(b, off, len);
882 // always use a view for large byte arrays to avoid JVM pooling large direct buffers
883 _slice=_len<getBufferSize()?null:_buffer.duplicate();
887 public AsyncWrite(ByteBuffer buffer, boolean complete)
890 _len=buffer.remaining();
891 // Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
892 _slice=_buffer.isDirect()||_len<getBufferSize()?null:_buffer.duplicate();
897 protected Action process()
899 // flush any content from the aggregate
900 if (BufferUtil.hasContent(_aggregate))
903 write(_aggregate, _complete && _completed, this);
904 return Action.SCHEDULED;
907 // Can we just aggregate the remainder?
908 if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
910 int position = BufferUtil.flipToFill(_aggregate);
911 BufferUtil.put(_buffer,_aggregate);
912 BufferUtil.flipToFlush(_aggregate, position);
913 return Action.SUCCEEDED;
916 // Is there data left to write?
917 if (_buffer.hasRemaining())
919 // if there is no slice, just write it
923 write(_buffer, _complete, this);
924 return Action.SCHEDULED;
927 // otherwise take a slice
928 int p=_buffer.position();
929 int l=Math.min(getBufferSize(),_buffer.remaining());
932 _buffer.position(pl);
934 _completed=!_buffer.hasRemaining();
935 write(_slice, _complete && _completed, this);
936 return Action.SCHEDULED;
939 // all content written, but if we have not yet signal completion, we
941 if (_complete && !_completed)
944 write(BufferUtil.EMPTY_BUFFER, _complete, this);
945 return Action.SCHEDULED;
948 return Action.SUCCEEDED;
952 protected void completed()
963 /* ------------------------------------------------------------ */
964 /** An iterating callback that will take content from an
965 * InputStream and write it to the associated {@link HttpChannel}.
966 * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
967 * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
968 * be notified as each buffer is written and only once all the input is consumed will the
969 * wrapped {@link Callback#succeeded()} method be called.
971 private class InputStreamWritingCB extends IteratingNestedCallback
973 private final InputStream _in;
974 private final ByteBuffer _buffer;
975 private boolean _eof;
977 public InputStreamWritingCB(InputStream in, Callback callback)
981 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
985 protected Action process() throws Exception
987 // Only return if EOF has previously been read and thus
988 // a write done with EOF=true
994 _channel.getByteBufferPool().release(_buffer);
995 return Action.SUCCEEDED;
998 // Read until buffer full or EOF
1000 while (len<_buffer.capacity() && !_eof)
1002 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1009 // write what we have
1010 _buffer.position(0);
1012 write(_buffer,_eof,this);
1013 return Action.SCHEDULED;
1017 public void failed(Throwable x)
1020 _channel.getByteBufferPool().release(_buffer);
1025 catch (IOException e)
1033 /* ------------------------------------------------------------ */
1034 /** An iterating callback that will take content from a
1035 * ReadableByteChannel and write it to the {@link HttpChannel}.
1036 * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
1037 * {@link HttpChannel#useDirectBuffers()} is true.
1038 * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1039 * be notified as each buffer is written and only once all the input is consumed will the
1040 * wrapped {@link Callback#succeeded()} method be called.
1042 private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1044 private final ReadableByteChannel _in;
1045 private final ByteBuffer _buffer;
1046 private boolean _eof;
1048 public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1052 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1056 protected Action process() throws Exception
1058 // Only return if EOF has previously been read and thus
1059 // a write done with EOF=true
1064 _channel.getByteBufferPool().release(_buffer);
1065 return Action.SUCCEEDED;
1068 // Read from stream until buffer full or EOF
1070 while (_buffer.hasRemaining() && !_eof)
1071 _eof = (_in.read(_buffer)) < 0;
1073 // write what we have
1075 write(_buffer,_eof,this);
1077 return Action.SCHEDULED;
1081 public void failed(Throwable x)
1084 _channel.getByteBufferPool().release(_buffer);
1089 catch (IOException e)