]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/server/HttpOutput.java
Merge "Update notes about password security"
[gigi.git] / lib / jetty / org / eclipse / jetty / server / HttpOutput.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.ReadableByteChannel;
25 import java.nio.channels.WritePendingException;
26 import java.util.concurrent.atomic.AtomicReference;
27 import javax.servlet.RequestDispatcher;
28 import javax.servlet.ServletOutputStream;
29 import javax.servlet.ServletRequest;
30 import javax.servlet.ServletResponse;
31 import javax.servlet.WriteListener;
32
33 import org.eclipse.jetty.http.HttpContent;
34 import org.eclipse.jetty.io.EofException;
35 import org.eclipse.jetty.util.BufferUtil;
36 import org.eclipse.jetty.util.Callback;
37 import org.eclipse.jetty.util.IteratingCallback;
38 import org.eclipse.jetty.util.IteratingNestedCallback;
39 import org.eclipse.jetty.util.SharedBlockingCallback;
40 import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43
44 /**
45  * <p>{@link HttpOutput} implements {@link ServletOutputStream}
46  * as required by the Servlet specification.</p>
47  * <p>{@link HttpOutput} buffers content written by the application until a
48  * further write will overflow the buffer, at which point it triggers a commit
49  * of the response.</p>
50  * <p>{@link HttpOutput} can be closed and reopened, to allow requests included
51  * via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
52  * close the stream, to be reopened after the inclusion ends.</p>
53  */
54 public class HttpOutput extends ServletOutputStream implements Runnable
55 {
56     private static Logger LOG = Log.getLogger(HttpOutput.class);
57     private final HttpChannel<?> _channel;
58     private final SharedBlockingCallback _writeblock=new SharedBlockingCallback()
59     {
60         @Override
61         protected long getIdleTimeout()
62         {
63             return _channel.getIdleTimeout();
64         }
65     };
66     private long _written;
67     private ByteBuffer _aggregate;
68     private int _bufferSize;
69     private int _commitSize;
70     private WriteListener _writeListener;
71     private volatile Throwable _onError;
72
73     /*
74     ACTION             OPEN       ASYNC      READY      PENDING       UNREADY       CLOSED
75     -----------------------------------------------------------------------------------------------------
76     setWriteListener() READY->owp ise        ise        ise           ise           ise
77     write()            OPEN       ise        PENDING    wpe           wpe           eof
78     flush()            OPEN       ise        PENDING    wpe           wpe           eof
79     close()            CLOSED     CLOSED     CLOSED     CLOSED        wpe           CLOSED
80     isReady()          OPEN:true  READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
81     write completed    -          -          -          ASYNC         READY->owp    -
82     
83     */
84     enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
85     private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
86
87     public HttpOutput(HttpChannel<?> channel)
88     {
89         _channel = channel;
90         HttpConfiguration config = channel.getHttpConfiguration();
91         _bufferSize = config.getOutputBufferSize();
92         _commitSize = config.getOutputAggregationSize();
93         if (_commitSize>_bufferSize)
94         {
95             LOG.warn("OutputAggregationSize {} exceeds bufferSize {}",_commitSize,_bufferSize);
96             _commitSize=_bufferSize;
97         }
98     }
99     
100     public HttpChannel<?> getHttpChannel()
101     {
102         return _channel;
103     }
104     
105     public boolean isWritten()
106     {
107         return _written > 0;
108     }
109
110     public long getWritten()
111     {
112         return _written;
113     }
114
115     public void reset()
116     {
117         _written = 0;
118         reopen();
119     }
120
121     public void reopen()
122     {
123         _state.set(OutputState.OPEN);
124     }
125
126     public boolean isAllContentWritten()
127     {
128         return _channel.getResponse().isAllContentWritten(_written);
129     }
130
131     protected Blocker acquireWriteBlockingCallback() throws IOException
132     {
133         return _writeblock.acquire();
134     }
135     
136     protected void write(ByteBuffer content, boolean complete) throws IOException
137     {
138         try (Blocker blocker=_writeblock.acquire())
139         {        
140             write(content,complete,blocker);
141             blocker.block();
142         }
143     }
144     
145     protected void write(ByteBuffer content, boolean complete, Callback callback)
146     {
147         _channel.write(content,complete,callback);
148     }
149     
150     @Override
151     public void close()
152     {
153         loop: while(true)
154         {
155             OutputState state=_state.get();
156             switch (state)
157             {
158                 case CLOSED:
159                     break loop;
160                     
161                 case UNREADY:
162                     if (_state.compareAndSet(state,OutputState.ERROR))
163                         _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
164                     continue;
165                     
166                 default:
167                     if (_state.compareAndSet(state,OutputState.CLOSED))
168                     {
169                         try
170                         {
171                             write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding());
172                         }
173                         catch(IOException e)
174                         {
175                             LOG.debug(e);
176                             _channel.abort();
177                         }
178                         releaseBuffer();
179                         return;
180                     }
181             }
182         }
183     }
184
185     /* Called to indicated that the output is already closed (write with last==true performed) and the state needs to be updated to match */
186     void closed()
187     {
188         loop: while(true)
189         {
190             OutputState state=_state.get();
191             switch (state)
192             {
193                 case CLOSED:
194                     break loop;
195                     
196                 case UNREADY:
197                     if (_state.compareAndSet(state,OutputState.ERROR))
198                         _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
199                     continue;
200                     
201                 default:
202                     if (_state.compareAndSet(state,OutputState.CLOSED))
203                     {
204                         try
205                         {
206                             _channel.getResponse().closeOutput();
207                         }
208                         catch(IOException e)
209                         {
210                             LOG.debug(e);
211                             _channel.abort();
212                         }
213                         releaseBuffer();
214                         return;
215                     }
216             }
217         }
218     }
219
220     private void releaseBuffer()
221     {
222         if (_aggregate != null)
223         {
224             _channel.getConnector().getByteBufferPool().release(_aggregate);
225             _aggregate = null;
226         }
227     }
228
229     public boolean isClosed()
230     {
231         return _state.get()==OutputState.CLOSED;
232     }
233
234     @Override
235     public void flush() throws IOException
236     {
237         while(true)
238         {
239             switch(_state.get())
240             {
241                 case OPEN:
242                     write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
243                     return;
244
245                 case ASYNC:
246                     throw new IllegalStateException("isReady() not called");
247
248                 case READY:
249                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
250                         continue;
251                     new AsyncFlush().iterate();
252                     return;
253
254                 case PENDING:
255                 case UNREADY:
256                     throw new WritePendingException();
257
258                 case ERROR:
259                     throw new EofException(_onError);
260                     
261                 case CLOSED:
262                     return;
263             }
264             break;
265         }
266     }
267
268
269     @Override
270     public void write(byte[] b, int off, int len) throws IOException
271     {
272         _written+=len;
273         boolean complete=_channel.getResponse().isAllContentWritten(_written);
274
275         // Async or Blocking ?
276         while(true)
277         {
278             switch(_state.get())
279             {
280                 case OPEN:
281                     // process blocking below
282                     break;
283
284                 case ASYNC:
285                     throw new IllegalStateException("isReady() not called");
286
287                 case READY:
288                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
289                         continue;
290
291                     // Should we aggregate?
292                     if (!complete && len<=_commitSize)
293                     {
294                         if (_aggregate == null)
295                             _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
296
297                         // YES - fill the aggregate with content from the buffer
298                         int filled = BufferUtil.fill(_aggregate, b, off, len);
299
300                         // return if we are not complete, not full and filled all the content
301                         if (filled==len && !BufferUtil.isFull(_aggregate))
302                         {
303                             if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
304                                 throw new IllegalStateException();
305                             return;
306                         }
307
308                         // adjust offset/length
309                         off+=filled;
310                         len-=filled;
311                     }
312
313                     // Do the asynchronous writing from the callback
314                     new AsyncWrite(b,off,len,complete).iterate();
315                     return;
316
317                 case PENDING:
318                 case UNREADY:
319                     throw new WritePendingException();
320
321                 case ERROR:
322                     throw new EofException(_onError);
323                     
324                 case CLOSED:
325                     throw new EofException("Closed");
326             }
327             break;
328         }
329
330
331         // handle blocking write
332
333         // Should we aggregate?
334         int capacity = getBufferSize();
335         if (!complete && len<=_commitSize)
336         {
337             if (_aggregate == null)
338                 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
339
340             // YES - fill the aggregate with content from the buffer
341             int filled = BufferUtil.fill(_aggregate, b, off, len);
342
343             // return if we are not complete, not full and filled all the content
344             if (filled==len && !BufferUtil.isFull(_aggregate))
345                 return;
346
347             // adjust offset/length
348             off+=filled;
349             len-=filled;
350         }
351
352         // flush any content from the aggregate
353         if (BufferUtil.hasContent(_aggregate))
354         {
355             write(_aggregate, complete && len==0);
356
357             // should we fill aggregate again from the buffer?
358             if (len>0 && !complete && len<=_commitSize && len<=BufferUtil.space(_aggregate))
359             {
360                 BufferUtil.append(_aggregate, b, off, len);
361                 return;
362             }
363         }
364
365         // write any remaining content in the buffer directly
366         if (len>0)
367         {
368             ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
369             ByteBuffer view = wrap.duplicate();
370
371             // write a buffer capacity at a time to avoid JVM pooling large direct buffers
372             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
373             while (len>getBufferSize())
374             {
375                 int p=view.position();
376                 int l=p+getBufferSize();
377                 view.limit(p+getBufferSize());
378                 write(view,false);
379                 len-=getBufferSize();
380                 view.limit(l+Math.min(len,getBufferSize()));
381                 view.position(l);
382             }
383             write(view,complete);
384         }
385         else if (complete)
386             write(BufferUtil.EMPTY_BUFFER,complete);
387
388         if (complete)
389             closed();
390
391     }
392
393     public void write(ByteBuffer buffer) throws IOException
394     {
395         _written+=buffer.remaining();
396         boolean complete=_channel.getResponse().isAllContentWritten(_written);
397
398         // Async or Blocking ?
399         while(true)
400         {
401             switch(_state.get())
402             {
403                 case OPEN:
404                     // process blocking below
405                     break;
406
407                 case ASYNC:
408                     throw new IllegalStateException("isReady() not called");
409
410                 case READY:
411                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
412                         continue;
413
414                     // Do the asynchronous writing from the callback
415                     new AsyncWrite(buffer,complete).iterate();
416                     return;
417
418                 case PENDING:
419                 case UNREADY:
420                     throw new WritePendingException();
421
422                 case ERROR:
423                     throw new EofException(_onError);
424                     
425                 case CLOSED:
426                     throw new EofException("Closed");
427             }
428             break;
429         }
430
431
432         // handle blocking write
433         int len=BufferUtil.length(buffer);
434
435         // flush any content from the aggregate
436         if (BufferUtil.hasContent(_aggregate))
437             write(_aggregate, complete && len==0);
438
439         // write any remaining content in the buffer directly
440         if (len>0)
441             write(buffer, complete);
442         else if (complete)
443             write(BufferUtil.EMPTY_BUFFER,complete);
444
445         if (complete)
446             closed();
447     }
448
449     @Override
450     public void write(int b) throws IOException
451     {
452         _written+=1;
453         boolean complete=_channel.getResponse().isAllContentWritten(_written);
454
455         // Async or Blocking ?
456         while(true)
457         {
458             switch(_state.get())
459             {
460                 case OPEN:
461                     if (_aggregate == null)
462                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
463                     BufferUtil.append(_aggregate, (byte)b);
464
465                     // Check if all written or full
466                     if (complete || BufferUtil.isFull(_aggregate))
467                     {
468                         try(Blocker blocker=_writeblock.acquire())
469                         {
470                             write(_aggregate, complete, blocker);
471                             blocker.block();
472                         }
473                         if (complete)
474                             closed();
475                     }
476                     break;
477
478                 case ASYNC:
479                     throw new IllegalStateException("isReady() not called");
480
481                 case READY:
482                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
483                         continue;
484
485                     if (_aggregate == null)
486                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
487                     BufferUtil.append(_aggregate, (byte)b);
488
489                     // Check if all written or full
490                     if (!complete && !BufferUtil.isFull(_aggregate))
491                     {
492                         if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
493                             throw new IllegalStateException();
494                         return;
495                     }
496
497                     // Do the asynchronous writing from the callback
498                     new AsyncFlush().iterate();
499                     return;
500
501                 case PENDING:
502                 case UNREADY:
503                     throw new WritePendingException();
504
505                 case ERROR:
506                     throw new EofException(_onError);
507                     
508                 case CLOSED:
509                     throw new EofException("Closed");
510             }
511             break;
512         }
513     }
514
515     @Override
516     public void print(String s) throws IOException
517     {
518         if (isClosed())
519             throw new IOException("Closed");
520
521         write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
522     }
523
524     /* ------------------------------------------------------------ */
525     /** Blocking send of content.
526      * @param content The content to send.
527      * @throws IOException
528      */
529     public void sendContent(ByteBuffer content) throws IOException
530     {
531         try(Blocker blocker=_writeblock.acquire())
532         {
533             write(content,true,blocker);
534             blocker.block();
535         }
536     }
537
538     /* ------------------------------------------------------------ */
539     /** Blocking send of content.
540      * @param in The content to send
541      * @throws IOException
542      */
543     public void sendContent(InputStream in) throws IOException
544     {
545         try(Blocker blocker=_writeblock.acquire())
546         {
547             new InputStreamWritingCB(in,blocker).iterate();
548             blocker.block();
549         }
550     }
551
552     /* ------------------------------------------------------------ */
553     /** Blocking send of content.
554      * @param in The content to send
555      * @throws IOException
556      */
557     public void sendContent(ReadableByteChannel in) throws IOException
558     {
559         try(Blocker blocker=_writeblock.acquire())
560         {
561             new ReadableByteChannelWritingCB(in,blocker).iterate();
562             blocker.block();
563         }
564     }
565
566
567     /* ------------------------------------------------------------ */
568     /** Blocking send of content.
569      * @param content The content to send
570      * @throws IOException
571      */
572     public void sendContent(HttpContent content) throws IOException
573     {
574         try(Blocker blocker=_writeblock.acquire())
575         {
576             sendContent(content,blocker);
577             blocker.block();
578         }
579     }
580
581     /* ------------------------------------------------------------ */
582     /** Asynchronous send of content.
583      * @param content The content to send
584      * @param callback The callback to use to notify success or failure
585      */
586     public void sendContent(ByteBuffer content, final Callback callback)
587     {
588         write(content,true,new Callback()
589         {
590             @Override
591             public void succeeded()
592             {
593                 closed();
594                 callback.succeeded();
595             }
596
597             @Override
598             public void failed(Throwable x)
599             {
600                 callback.failed(x);
601             }
602         });
603     }
604
605     /* ------------------------------------------------------------ */
606     /** Asynchronous send of content.
607      * @param in The content to send as a stream.  The stream will be closed
608      * after reading all content.
609      * @param callback The callback to use to notify success or failure
610      */
611     public void sendContent(InputStream in, Callback callback)
612     {
613         new InputStreamWritingCB(in,callback).iterate();
614     }
615
616     /* ------------------------------------------------------------ */
617     /** Asynchronous send of content.
618      * @param in The content to send as a channel.  The channel will be closed
619      * after reading all content.
620      * @param callback The callback to use to notify success or failure
621      */
622     public void sendContent(ReadableByteChannel in, Callback callback)
623     {
624         new ReadableByteChannelWritingCB(in,callback).iterate();
625     }
626
627     /* ------------------------------------------------------------ */
628     /** Asynchronous send of content.
629      * @param httpContent The content to send
630      * @param callback The callback to use to notify success or failure
631      */
632     public void sendContent(HttpContent httpContent, Callback callback)
633     {
634         if (BufferUtil.hasContent(_aggregate))
635         {
636             callback.failed(new IOException("cannot sendContent() after write()"));
637             return;
638         }
639         if (_channel.isCommitted())
640         {
641             callback.failed(new IOException("committed"));
642             return;
643         }
644
645         while (true)
646         {
647             switch(_state.get())
648             {
649                 case OPEN:
650                     if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
651                         continue;
652                     break;
653                 case ERROR:
654                     callback.failed(new EofException(_onError));
655                     return;
656                     
657                 case CLOSED:
658                     callback.failed(new EofException("Closed"));
659                     return;
660                 default:
661                     throw new IllegalStateException();
662             }
663             break;
664         }
665         ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
666         if (buffer == null)
667             buffer = httpContent.getIndirectBuffer();
668
669         if (buffer!=null)
670         {
671             if (LOG.isDebugEnabled())
672                 LOG.debug("sendContent({}=={},{},direct={})",httpContent,BufferUtil.toDetailString(buffer),callback,_channel.useDirectBuffers());
673             
674             sendContent(buffer,callback);
675             return;
676         }
677
678         try
679         {
680             ReadableByteChannel rbc=httpContent.getReadableByteChannel();
681             if (rbc!=null)
682             {
683                 if (LOG.isDebugEnabled())
684                     LOG.debug("sendContent({}=={},{},direct={})",httpContent,rbc,callback,_channel.useDirectBuffers());
685                 // Close of the rbc is done by the async sendContent
686                 sendContent(rbc,callback);
687                 return;
688             }
689
690             InputStream in = httpContent.getInputStream();
691             if ( in!=null )
692             {
693                 if (LOG.isDebugEnabled())
694                     LOG.debug("sendContent({}=={},{},direct={})",httpContent,in,callback,_channel.useDirectBuffers());
695                 sendContent(in,callback);
696                 return;
697             }
698         }
699         catch(Throwable th)
700         {
701             callback.failed(th);
702             return;
703         }
704
705         callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
706     }
707
708     public int getBufferSize()
709     {
710         return _bufferSize;
711     }
712
713     public void setBufferSize(int size)
714     {
715         _bufferSize = size;
716         _commitSize = size;
717     }
718
719     public void resetBuffer()
720     {
721         if (BufferUtil.hasContent(_aggregate))
722             BufferUtil.clear(_aggregate);
723     }
724
725     @Override
726     public void setWriteListener(WriteListener writeListener)
727     {
728         if (!_channel.getState().isAsync())
729             throw new IllegalStateException("!ASYNC");
730
731         if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
732         {
733             _writeListener = writeListener;
734             _channel.getState().onWritePossible();
735         }
736         else
737             throw new IllegalStateException();
738     }
739
740     /**
741      * @see javax.servlet.ServletOutputStream#isReady()
742      */
743     @Override
744     public boolean isReady()
745     {
746         while (true)
747         {
748             switch(_state.get())
749             {
750                 case OPEN:
751                     return true;
752                 case ASYNC:
753                     if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
754                         continue;
755                     return true;
756                 case READY:
757                     return true;
758                 case PENDING:
759                     if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
760                         continue;
761                     return false;
762                 case UNREADY:
763                     return false;
764
765                 case ERROR:
766                     return true;
767                     
768                 case CLOSED:
769                     return true;
770             }
771         }
772     }
773
774     @Override
775     public void run()
776     {
777         loop: while (true)
778         {
779             OutputState state = _state.get();
780
781             if(_onError!=null)
782             {
783                 switch(state)
784                 {
785                     case CLOSED:
786                     case ERROR:
787                         _onError=null;
788                         break loop;
789
790                     default:
791                         if (_state.compareAndSet(state, OutputState.ERROR))
792                         {
793                             Throwable th=_onError;
794                             _onError=null;
795                             if (LOG.isDebugEnabled())
796                                 LOG.debug("onError",th);
797                             _writeListener.onError(th);
798                             close();
799
800                             break loop;
801                         }
802
803                 }
804                 continue;
805             }
806             
807             switch(_state.get())
808             {
809                 case CLOSED:
810                     // even though a write is not possible, because a close has 
811                     // occurred, we need to call onWritePossible to tell async
812                     // producer that the last write completed.
813                     // so fall through
814                 case READY:
815                     try
816                     {
817                         _writeListener.onWritePossible();
818                         break loop;
819                     }
820                     catch (Throwable e)
821                     {
822                         _onError=e;
823                     }
824                     break;
825                     
826                 default:
827                     _onError=new IllegalStateException("state="+_state.get());
828             }
829         }
830     }
831     
832     @Override
833     public String toString()
834     {
835         return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
836     }
837     
838     private abstract class AsyncICB extends IteratingCallback
839     {
840         @Override
841         protected void onCompleteSuccess()
842         {
843             while(true)
844             {
845                 OutputState last=_state.get();
846                 switch(last)
847                 {
848                     case PENDING:
849                         if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
850                             continue;
851                         break;
852
853                     case UNREADY:
854                         if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
855                             continue;
856                         _channel.getState().onWritePossible();
857                         break;
858
859                     case CLOSED:
860                         break;
861
862                     default:
863                         throw new IllegalStateException();
864                 }
865                 break;
866             }
867         }
868
869         @Override
870         public void onCompleteFailure(Throwable e)
871         {
872             _onError=e==null?new IOException():e;
873             _channel.getState().onWritePossible();
874         }
875     }
876     
877     
878     private class AsyncFlush extends AsyncICB
879     {
880         protected volatile boolean _flushed;
881
882         public AsyncFlush()
883         {
884         }
885
886         @Override
887         protected Action process()
888         {
889             if (BufferUtil.hasContent(_aggregate))
890             {
891                 _flushed=true;
892                 write(_aggregate, false, this);
893                 return Action.SCHEDULED;
894             }
895
896             if (!_flushed)
897             {
898                 _flushed=true;
899                 write(BufferUtil.EMPTY_BUFFER,false,this);
900                 return Action.SCHEDULED;
901             }
902
903             return Action.SUCCEEDED;
904         }
905     }
906
907
908
909     private class AsyncWrite extends AsyncICB
910     {
911         private final ByteBuffer _buffer;
912         private final ByteBuffer _slice;
913         private final boolean _complete;
914         private final int _len;
915         protected volatile boolean _completed;
916
917         public AsyncWrite(byte[] b, int off, int len, boolean complete)
918         {
919             _buffer=ByteBuffer.wrap(b, off, len);
920             _len=len;
921             // always use a view for large byte arrays to avoid JVM pooling large direct buffers
922             _slice=_len<getBufferSize()?null:_buffer.duplicate();
923             _complete=complete;
924         }
925
926         public AsyncWrite(ByteBuffer buffer, boolean complete)
927         {
928             _buffer=buffer;
929             _len=buffer.remaining();
930             // Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
931             _slice=_buffer.isDirect()||_len<getBufferSize()?null:_buffer.duplicate();
932             _complete=complete;
933         }
934
935         @Override
936         protected Action process()
937         {
938             // flush any content from the aggregate
939             if (BufferUtil.hasContent(_aggregate))
940             {
941                 _completed=_len==0;
942                 write(_aggregate, _complete && _completed, this);
943                 return Action.SCHEDULED;
944             }
945
946             // Can we just aggregate the remainder?
947             if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
948             {
949                 int position = BufferUtil.flipToFill(_aggregate);
950                 BufferUtil.put(_buffer,_aggregate);
951                 BufferUtil.flipToFlush(_aggregate, position);
952                 return Action.SUCCEEDED;
953             }
954             
955             // Is there data left to write?
956             if (_buffer.hasRemaining())
957             {
958                 // if there is no slice, just write it
959                 if (_slice==null)
960                 {
961                     _completed=true;
962                     write(_buffer, _complete, this);
963                     return Action.SCHEDULED;
964                 }
965                 
966                 // otherwise take a slice
967                 int p=_buffer.position();
968                 int l=Math.min(getBufferSize(),_buffer.remaining());
969                 int pl=p+l;
970                 _slice.limit(pl);
971                 _buffer.position(pl);
972                 _slice.position(p);
973                 _completed=!_buffer.hasRemaining();
974                 write(_slice, _complete && _completed, this);
975                 return Action.SCHEDULED;
976             }
977             
978             // all content written, but if we have not yet signal completion, we
979             // need to do so
980             if (_complete && !_completed)
981             {
982                 _completed=true;
983                 write(BufferUtil.EMPTY_BUFFER, _complete, this);
984                 return Action.SCHEDULED;
985             }
986
987             return Action.SUCCEEDED;
988         }
989
990         @Override
991         protected void onCompleteSuccess()
992         {
993             super.onCompleteSuccess();
994             if (_complete)
995                 closed();
996         }
997         
998         
999     }
1000
1001
1002     /* ------------------------------------------------------------ */
1003     /** An iterating callback that will take content from an
1004      * InputStream and write it to the associated {@link HttpChannel}.
1005      * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
1006      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1007      * be notified as each buffer is written and only once all the input is consumed will the
1008      * wrapped {@link Callback#succeeded()} method be called.
1009      */
1010     private class InputStreamWritingCB extends IteratingNestedCallback
1011     {
1012         private final InputStream _in;
1013         private final ByteBuffer _buffer;
1014         private boolean _eof;
1015
1016         public InputStreamWritingCB(InputStream in, Callback callback)
1017         {
1018             super(callback);
1019             _in=in;
1020             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
1021         }
1022
1023         @Override
1024         protected Action process() throws Exception
1025         {
1026             // Only return if EOF has previously been read and thus
1027             // a write done with EOF=true
1028             if (_eof)
1029             {
1030                 // Handle EOF
1031                 _in.close();
1032                 closed();
1033                 _channel.getByteBufferPool().release(_buffer);
1034                 return Action.SUCCEEDED;
1035             }
1036             
1037             // Read until buffer full or EOF
1038             int len=0;
1039             while (len<_buffer.capacity() && !_eof)
1040             {
1041                 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1042                 if (r<0)
1043                     _eof=true;
1044                 else
1045                     len+=r;
1046             }
1047
1048             // write what we have
1049             _buffer.position(0);
1050             _buffer.limit(len);
1051             write(_buffer,_eof,this);
1052             return Action.SCHEDULED;
1053         }
1054
1055         @Override
1056         public void onCompleteFailure(Throwable x)
1057         {
1058             super.onCompleteFailure(x);
1059             _channel.getByteBufferPool().release(_buffer);
1060             try
1061             {
1062                 _in.close();
1063             }
1064             catch (IOException e)
1065             {
1066                 LOG.ignore(e);
1067             }
1068         }
1069
1070     }
1071
1072     /* ------------------------------------------------------------ */
1073     /** An iterating callback that will take content from a
1074      * ReadableByteChannel and write it to the {@link HttpChannel}.
1075      * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
1076      * {@link HttpChannel#useDirectBuffers()} is true.
1077      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1078      * be notified as each buffer is written and only once all the input is consumed will the
1079      * wrapped {@link Callback#succeeded()} method be called.
1080      */
1081     private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1082     {
1083         private final ReadableByteChannel _in;
1084         private final ByteBuffer _buffer;
1085         private boolean _eof;
1086
1087         public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1088         {
1089             super(callback);
1090             _in=in;
1091             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1092         }
1093
1094         @Override
1095         protected Action process() throws Exception
1096         {
1097             // Only return if EOF has previously been read and thus
1098             // a write done with EOF=true
1099             if (_eof)
1100             {
1101                 _in.close();
1102                 closed();
1103                 _channel.getByteBufferPool().release(_buffer);
1104                 return Action.SUCCEEDED;
1105             }
1106             
1107             // Read from stream until buffer full or EOF
1108             _buffer.clear();
1109             while (_buffer.hasRemaining() && !_eof)
1110               _eof = (_in.read(_buffer)) <  0;
1111
1112             // write what we have
1113             _buffer.flip();
1114             write(_buffer,_eof,this);
1115
1116             return Action.SCHEDULED;
1117         }
1118
1119         @Override
1120         public void onCompleteFailure(Throwable x)
1121         {
1122             super.onCompleteFailure(x);
1123             _channel.getByteBufferPool().release(_buffer);
1124             try
1125             {
1126                 _in.close();
1127             }
1128             catch (IOException e)
1129             {
1130                 LOG.ignore(e);
1131             }
1132         }
1133     }
1134 }