2 // ========================================================================
3 // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.server;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.ReadableByteChannel;
25 import java.nio.channels.WritePendingException;
26 import java.util.concurrent.atomic.AtomicReference;
27 import javax.servlet.RequestDispatcher;
28 import javax.servlet.ServletOutputStream;
29 import javax.servlet.ServletRequest;
30 import javax.servlet.ServletResponse;
31 import javax.servlet.WriteListener;
33 import org.eclipse.jetty.http.HttpContent;
34 import org.eclipse.jetty.io.EofException;
35 import org.eclipse.jetty.util.BufferUtil;
36 import org.eclipse.jetty.util.Callback;
37 import org.eclipse.jetty.util.IteratingCallback;
38 import org.eclipse.jetty.util.IteratingNestedCallback;
39 import org.eclipse.jetty.util.SharedBlockingCallback;
40 import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
45 * <p>{@link HttpOutput} implements {@link ServletOutputStream}
46 * as required by the Servlet specification.</p>
47 * <p>{@link HttpOutput} buffers content written by the application until a
48 * further write will overflow the buffer, at which point it triggers a commit
49 * of the response.</p>
50 * <p>{@link HttpOutput} can be closed and reopened, to allow requests included
51 * via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
52 * close the stream, to be reopened after the inclusion ends.</p>
54 public class HttpOutput extends ServletOutputStream implements Runnable
56 private static Logger LOG = Log.getLogger(HttpOutput.class);
57 private final HttpChannel<?> _channel;
58 private final SharedBlockingCallback _writeblock=new SharedBlockingCallback()
61 protected long getIdleTimeout()
63 return _channel.getIdleTimeout();
66 private long _written;
67 private ByteBuffer _aggregate;
68 private int _bufferSize;
69 private int _commitSize;
70 private WriteListener _writeListener;
71 private volatile Throwable _onError;
74 ACTION OPEN ASYNC READY PENDING UNREADY CLOSED
75 -----------------------------------------------------------------------------------------------------
76 setWriteListener() READY->owp ise ise ise ise ise
77 write() OPEN ise PENDING wpe wpe eof
78 flush() OPEN ise PENDING wpe wpe eof
79 close() CLOSED CLOSED CLOSED CLOSED wpe CLOSED
80 isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
81 write completed - - - ASYNC READY->owp -
84 enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
85 private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
87 public HttpOutput(HttpChannel<?> channel)
90 HttpConfiguration config = channel.getHttpConfiguration();
91 _bufferSize = config.getOutputBufferSize();
92 _commitSize = config.getOutputAggregationSize();
93 if (_commitSize>_bufferSize)
95 LOG.warn("OutputAggregationSize {} exceeds bufferSize {}",_commitSize,_bufferSize);
96 _commitSize=_bufferSize;
100 public HttpChannel<?> getHttpChannel()
105 public boolean isWritten()
110 public long getWritten()
123 _state.set(OutputState.OPEN);
126 public boolean isAllContentWritten()
128 return _channel.getResponse().isAllContentWritten(_written);
131 protected Blocker acquireWriteBlockingCallback() throws IOException
133 return _writeblock.acquire();
136 protected void write(ByteBuffer content, boolean complete) throws IOException
138 try (Blocker blocker=_writeblock.acquire())
140 write(content,complete,blocker);
145 protected void write(ByteBuffer content, boolean complete, Callback callback)
147 _channel.write(content,complete,callback);
155 OutputState state=_state.get();
162 if (_state.compareAndSet(state,OutputState.ERROR))
163 _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
167 if (_state.compareAndSet(state,OutputState.CLOSED))
171 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding());
185 /* Called to indicated that the output is already closed (write with last==true performed) and the state needs to be updated to match */
190 OutputState state=_state.get();
197 if (_state.compareAndSet(state,OutputState.ERROR))
198 _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
202 if (_state.compareAndSet(state,OutputState.CLOSED))
206 _channel.getResponse().closeOutput();
220 private void releaseBuffer()
222 if (_aggregate != null)
224 _channel.getConnector().getByteBufferPool().release(_aggregate);
229 public boolean isClosed()
231 return _state.get()==OutputState.CLOSED;
235 public void flush() throws IOException
242 write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
246 throw new IllegalStateException("isReady() not called");
249 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
251 new AsyncFlush().iterate();
256 throw new WritePendingException();
259 throw new EofException(_onError);
270 public void write(byte[] b, int off, int len) throws IOException
273 boolean complete=_channel.getResponse().isAllContentWritten(_written);
275 // Async or Blocking ?
281 // process blocking below
285 throw new IllegalStateException("isReady() not called");
288 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
291 // Should we aggregate?
292 if (!complete && len<=_commitSize)
294 if (_aggregate == null)
295 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
297 // YES - fill the aggregate with content from the buffer
298 int filled = BufferUtil.fill(_aggregate, b, off, len);
300 // return if we are not complete, not full and filled all the content
301 if (filled==len && !BufferUtil.isFull(_aggregate))
303 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
304 throw new IllegalStateException();
308 // adjust offset/length
313 // Do the asynchronous writing from the callback
314 new AsyncWrite(b,off,len,complete).iterate();
319 throw new WritePendingException();
322 throw new EofException(_onError);
325 throw new EofException("Closed");
331 // handle blocking write
333 // Should we aggregate?
334 int capacity = getBufferSize();
335 if (!complete && len<=_commitSize)
337 if (_aggregate == null)
338 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
340 // YES - fill the aggregate with content from the buffer
341 int filled = BufferUtil.fill(_aggregate, b, off, len);
343 // return if we are not complete, not full and filled all the content
344 if (filled==len && !BufferUtil.isFull(_aggregate))
347 // adjust offset/length
352 // flush any content from the aggregate
353 if (BufferUtil.hasContent(_aggregate))
355 write(_aggregate, complete && len==0);
357 // should we fill aggregate again from the buffer?
358 if (len>0 && !complete && len<=_commitSize && len<=BufferUtil.space(_aggregate))
360 BufferUtil.append(_aggregate, b, off, len);
365 // write any remaining content in the buffer directly
368 ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
369 ByteBuffer view = wrap.duplicate();
371 // write a buffer capacity at a time to avoid JVM pooling large direct buffers
372 // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
373 while (len>getBufferSize())
375 int p=view.position();
376 int l=p+getBufferSize();
377 view.limit(p+getBufferSize());
379 len-=getBufferSize();
380 view.limit(l+Math.min(len,getBufferSize()));
383 write(view,complete);
386 write(BufferUtil.EMPTY_BUFFER,complete);
393 public void write(ByteBuffer buffer) throws IOException
395 _written+=buffer.remaining();
396 boolean complete=_channel.getResponse().isAllContentWritten(_written);
398 // Async or Blocking ?
404 // process blocking below
408 throw new IllegalStateException("isReady() not called");
411 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
414 // Do the asynchronous writing from the callback
415 new AsyncWrite(buffer,complete).iterate();
420 throw new WritePendingException();
423 throw new EofException(_onError);
426 throw new EofException("Closed");
432 // handle blocking write
433 int len=BufferUtil.length(buffer);
435 // flush any content from the aggregate
436 if (BufferUtil.hasContent(_aggregate))
437 write(_aggregate, complete && len==0);
439 // write any remaining content in the buffer directly
441 write(buffer, complete);
443 write(BufferUtil.EMPTY_BUFFER,complete);
450 public void write(int b) throws IOException
453 boolean complete=_channel.getResponse().isAllContentWritten(_written);
455 // Async or Blocking ?
461 if (_aggregate == null)
462 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
463 BufferUtil.append(_aggregate, (byte)b);
465 // Check if all written or full
466 if (complete || BufferUtil.isFull(_aggregate))
468 try(Blocker blocker=_writeblock.acquire())
470 write(_aggregate, complete, blocker);
479 throw new IllegalStateException("isReady() not called");
482 if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
485 if (_aggregate == null)
486 _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
487 BufferUtil.append(_aggregate, (byte)b);
489 // Check if all written or full
490 if (!complete && !BufferUtil.isFull(_aggregate))
492 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
493 throw new IllegalStateException();
497 // Do the asynchronous writing from the callback
498 new AsyncFlush().iterate();
503 throw new WritePendingException();
506 throw new EofException(_onError);
509 throw new EofException("Closed");
516 public void print(String s) throws IOException
519 throw new IOException("Closed");
521 write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
524 /* ------------------------------------------------------------ */
525 /** Blocking send of content.
526 * @param content The content to send.
527 * @throws IOException
529 public void sendContent(ByteBuffer content) throws IOException
531 try(Blocker blocker=_writeblock.acquire())
533 write(content,true,blocker);
538 /* ------------------------------------------------------------ */
539 /** Blocking send of content.
540 * @param in The content to send
541 * @throws IOException
543 public void sendContent(InputStream in) throws IOException
545 try(Blocker blocker=_writeblock.acquire())
547 new InputStreamWritingCB(in,blocker).iterate();
552 /* ------------------------------------------------------------ */
553 /** Blocking send of content.
554 * @param in The content to send
555 * @throws IOException
557 public void sendContent(ReadableByteChannel in) throws IOException
559 try(Blocker blocker=_writeblock.acquire())
561 new ReadableByteChannelWritingCB(in,blocker).iterate();
567 /* ------------------------------------------------------------ */
568 /** Blocking send of content.
569 * @param content The content to send
570 * @throws IOException
572 public void sendContent(HttpContent content) throws IOException
574 try(Blocker blocker=_writeblock.acquire())
576 sendContent(content,blocker);
581 /* ------------------------------------------------------------ */
582 /** Asynchronous send of content.
583 * @param content The content to send
584 * @param callback The callback to use to notify success or failure
586 public void sendContent(ByteBuffer content, final Callback callback)
588 write(content,true,new Callback()
591 public void succeeded()
594 callback.succeeded();
598 public void failed(Throwable x)
605 /* ------------------------------------------------------------ */
606 /** Asynchronous send of content.
607 * @param in The content to send as a stream. The stream will be closed
608 * after reading all content.
609 * @param callback The callback to use to notify success or failure
611 public void sendContent(InputStream in, Callback callback)
613 new InputStreamWritingCB(in,callback).iterate();
616 /* ------------------------------------------------------------ */
617 /** Asynchronous send of content.
618 * @param in The content to send as a channel. The channel will be closed
619 * after reading all content.
620 * @param callback The callback to use to notify success or failure
622 public void sendContent(ReadableByteChannel in, Callback callback)
624 new ReadableByteChannelWritingCB(in,callback).iterate();
627 /* ------------------------------------------------------------ */
628 /** Asynchronous send of content.
629 * @param httpContent The content to send
630 * @param callback The callback to use to notify success or failure
632 public void sendContent(HttpContent httpContent, Callback callback)
634 if (BufferUtil.hasContent(_aggregate))
636 callback.failed(new IOException("cannot sendContent() after write()"));
639 if (_channel.isCommitted())
641 callback.failed(new IOException("committed"));
650 if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
654 callback.failed(new EofException(_onError));
658 callback.failed(new EofException("Closed"));
661 throw new IllegalStateException();
665 ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
667 buffer = httpContent.getIndirectBuffer();
671 if (LOG.isDebugEnabled())
672 LOG.debug("sendContent({}=={},{},direct={})",httpContent,BufferUtil.toDetailString(buffer),callback,_channel.useDirectBuffers());
674 sendContent(buffer,callback);
680 ReadableByteChannel rbc=httpContent.getReadableByteChannel();
683 if (LOG.isDebugEnabled())
684 LOG.debug("sendContent({}=={},{},direct={})",httpContent,rbc,callback,_channel.useDirectBuffers());
685 // Close of the rbc is done by the async sendContent
686 sendContent(rbc,callback);
690 InputStream in = httpContent.getInputStream();
693 if (LOG.isDebugEnabled())
694 LOG.debug("sendContent({}=={},{},direct={})",httpContent,in,callback,_channel.useDirectBuffers());
695 sendContent(in,callback);
705 callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
708 public int getBufferSize()
713 public void setBufferSize(int size)
719 public void resetBuffer()
721 if (BufferUtil.hasContent(_aggregate))
722 BufferUtil.clear(_aggregate);
726 public void setWriteListener(WriteListener writeListener)
728 if (!_channel.getState().isAsync())
729 throw new IllegalStateException("!ASYNC");
731 if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
733 _writeListener = writeListener;
734 _channel.getState().onWritePossible();
737 throw new IllegalStateException();
741 * @see javax.servlet.ServletOutputStream#isReady()
744 public boolean isReady()
753 if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
759 if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
779 OutputState state = _state.get();
791 if (_state.compareAndSet(state, OutputState.ERROR))
793 Throwable th=_onError;
795 if (LOG.isDebugEnabled())
796 LOG.debug("onError",th);
797 _writeListener.onError(th);
810 // even though a write is not possible, because a close has
811 // occurred, we need to call onWritePossible to tell async
812 // producer that the last write completed.
817 _writeListener.onWritePossible();
827 _onError=new IllegalStateException("state="+_state.get());
833 public String toString()
835 return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
838 private abstract class AsyncICB extends IteratingCallback
841 protected void onCompleteSuccess()
845 OutputState last=_state.get();
849 if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
854 if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
856 _channel.getState().onWritePossible();
863 throw new IllegalStateException();
870 public void onCompleteFailure(Throwable e)
872 _onError=e==null?new IOException():e;
873 _channel.getState().onWritePossible();
878 private class AsyncFlush extends AsyncICB
880 protected volatile boolean _flushed;
887 protected Action process()
889 if (BufferUtil.hasContent(_aggregate))
892 write(_aggregate, false, this);
893 return Action.SCHEDULED;
899 write(BufferUtil.EMPTY_BUFFER,false,this);
900 return Action.SCHEDULED;
903 return Action.SUCCEEDED;
909 private class AsyncWrite extends AsyncICB
911 private final ByteBuffer _buffer;
912 private final ByteBuffer _slice;
913 private final boolean _complete;
914 private final int _len;
915 protected volatile boolean _completed;
917 public AsyncWrite(byte[] b, int off, int len, boolean complete)
919 _buffer=ByteBuffer.wrap(b, off, len);
921 // always use a view for large byte arrays to avoid JVM pooling large direct buffers
922 _slice=_len<getBufferSize()?null:_buffer.duplicate();
926 public AsyncWrite(ByteBuffer buffer, boolean complete)
929 _len=buffer.remaining();
930 // Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
931 _slice=_buffer.isDirect()||_len<getBufferSize()?null:_buffer.duplicate();
936 protected Action process()
938 // flush any content from the aggregate
939 if (BufferUtil.hasContent(_aggregate))
942 write(_aggregate, _complete && _completed, this);
943 return Action.SCHEDULED;
946 // Can we just aggregate the remainder?
947 if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
949 int position = BufferUtil.flipToFill(_aggregate);
950 BufferUtil.put(_buffer,_aggregate);
951 BufferUtil.flipToFlush(_aggregate, position);
952 return Action.SUCCEEDED;
955 // Is there data left to write?
956 if (_buffer.hasRemaining())
958 // if there is no slice, just write it
962 write(_buffer, _complete, this);
963 return Action.SCHEDULED;
966 // otherwise take a slice
967 int p=_buffer.position();
968 int l=Math.min(getBufferSize(),_buffer.remaining());
971 _buffer.position(pl);
973 _completed=!_buffer.hasRemaining();
974 write(_slice, _complete && _completed, this);
975 return Action.SCHEDULED;
978 // all content written, but if we have not yet signal completion, we
980 if (_complete && !_completed)
983 write(BufferUtil.EMPTY_BUFFER, _complete, this);
984 return Action.SCHEDULED;
987 return Action.SUCCEEDED;
991 protected void onCompleteSuccess()
993 super.onCompleteSuccess();
1002 /* ------------------------------------------------------------ */
1003 /** An iterating callback that will take content from an
1004 * InputStream and write it to the associated {@link HttpChannel}.
1005 * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
1006 * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1007 * be notified as each buffer is written and only once all the input is consumed will the
1008 * wrapped {@link Callback#succeeded()} method be called.
1010 private class InputStreamWritingCB extends IteratingNestedCallback
1012 private final InputStream _in;
1013 private final ByteBuffer _buffer;
1014 private boolean _eof;
1016 public InputStreamWritingCB(InputStream in, Callback callback)
1020 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
1024 protected Action process() throws Exception
1026 // Only return if EOF has previously been read and thus
1027 // a write done with EOF=true
1033 _channel.getByteBufferPool().release(_buffer);
1034 return Action.SUCCEEDED;
1037 // Read until buffer full or EOF
1039 while (len<_buffer.capacity() && !_eof)
1041 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1048 // write what we have
1049 _buffer.position(0);
1051 write(_buffer,_eof,this);
1052 return Action.SCHEDULED;
1056 public void onCompleteFailure(Throwable x)
1058 super.onCompleteFailure(x);
1059 _channel.getByteBufferPool().release(_buffer);
1064 catch (IOException e)
1072 /* ------------------------------------------------------------ */
1073 /** An iterating callback that will take content from a
1074 * ReadableByteChannel and write it to the {@link HttpChannel}.
1075 * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
1076 * {@link HttpChannel#useDirectBuffers()} is true.
1077 * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1078 * be notified as each buffer is written and only once all the input is consumed will the
1079 * wrapped {@link Callback#succeeded()} method be called.
1081 private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1083 private final ReadableByteChannel _in;
1084 private final ByteBuffer _buffer;
1085 private boolean _eof;
1087 public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1091 _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1095 protected Action process() throws Exception
1097 // Only return if EOF has previously been read and thus
1098 // a write done with EOF=true
1103 _channel.getByteBufferPool().release(_buffer);
1104 return Action.SUCCEEDED;
1107 // Read from stream until buffer full or EOF
1109 while (_buffer.hasRemaining() && !_eof)
1110 _eof = (_in.read(_buffer)) < 0;
1112 // write what we have
1114 write(_buffer,_eof,this);
1116 return Action.SCHEDULED;
1120 public void onCompleteFailure(Throwable x)
1122 super.onCompleteFailure(x);
1123 _channel.getByteBufferPool().release(_buffer);
1128 catch (IOException e)