]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/server/HttpOutput.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / server / HttpOutput.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.ReadableByteChannel;
25 import java.nio.channels.WritePendingException;
26 import java.util.concurrent.atomic.AtomicReference;
27 import javax.servlet.RequestDispatcher;
28 import javax.servlet.ServletOutputStream;
29 import javax.servlet.ServletRequest;
30 import javax.servlet.ServletResponse;
31 import javax.servlet.WriteListener;
32
33 import org.eclipse.jetty.http.HttpContent;
34 import org.eclipse.jetty.io.EofException;
35 import org.eclipse.jetty.util.BufferUtil;
36 import org.eclipse.jetty.util.Callback;
37 import org.eclipse.jetty.util.IteratingCallback;
38 import org.eclipse.jetty.util.IteratingNestedCallback;
39 import org.eclipse.jetty.util.SharedBlockingCallback;
40 import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43
44 /**
45  * <p>{@link HttpOutput} implements {@link ServletOutputStream}
46  * as required by the Servlet specification.</p>
47  * <p>{@link HttpOutput} buffers content written by the application until a
48  * further write will overflow the buffer, at which point it triggers a commit
49  * of the response.</p>
50  * <p>{@link HttpOutput} can be closed and reopened, to allow requests included
51  * via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
52  * close the stream, to be reopened after the inclusion ends.</p>
53  */
54 public class HttpOutput extends ServletOutputStream implements Runnable
55 {
56     private static Logger LOG = Log.getLogger(HttpOutput.class);
57     private final HttpChannel<?> _channel;
58     private final SharedBlockingCallback _writeblock=new SharedBlockingCallback();
59     private long _written;
60     private ByteBuffer _aggregate;
61     private int _bufferSize;
62     private int _commitSize;
63     private WriteListener _writeListener;
64     private volatile Throwable _onError;
65
66     /*
67     ACTION             OPEN       ASYNC      READY      PENDING       UNREADY       CLOSED
68     -----------------------------------------------------------------------------------------------------
69     setWriteListener() READY->owp ise        ise        ise           ise           ise
70     write()            OPEN       ise        PENDING    wpe           wpe           eof
71     flush()            OPEN       ise        PENDING    wpe           wpe           eof
72     close()            CLOSED     CLOSED     CLOSED     CLOSED        wpe           CLOSED
73     isReady()          OPEN:true  READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
74     write completed    -          -          -          ASYNC         READY->owp    -
75     
76     */
77     enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED }
78     private final AtomicReference<OutputState> _state=new AtomicReference<>(OutputState.OPEN);
79
80     public HttpOutput(HttpChannel<?> channel)
81     {
82         _channel = channel;
83         _bufferSize = _channel.getHttpConfiguration().getOutputBufferSize();
84         _commitSize=_bufferSize/4;
85     }
86     
87     public HttpChannel<?> getHttpChannel()
88     {
89         return _channel;
90     }
91     
92     public boolean isWritten()
93     {
94         return _written > 0;
95     }
96
97     public long getWritten()
98     {
99         return _written;
100     }
101
102     public void reset()
103     {
104         _written = 0;
105         reopen();
106     }
107
108     public void reopen()
109     {
110         _state.set(OutputState.OPEN);
111     }
112
113     public boolean isAllContentWritten()
114     {
115         return _channel.getResponse().isAllContentWritten(_written);
116     }
117
118     protected Blocker acquireWriteBlockingCallback() throws IOException
119     {
120         return _writeblock.acquire();
121     }
122     
123     protected void write(ByteBuffer content, boolean complete) throws IOException
124     {
125         try (Blocker blocker=_writeblock.acquire())
126         {        
127             write(content,complete,blocker);
128             blocker.block();
129         }
130     }
131     
132     protected void write(ByteBuffer content, boolean complete, Callback callback)
133     {
134         _channel.write(content,complete,callback);
135     }
136     
137     @Override
138     public void close()
139     {
140         loop: while(true)
141         {
142             OutputState state=_state.get();
143             switch (state)
144             {
145                 case CLOSED:
146                     break loop;
147                     
148                 case UNREADY:
149                     if (_state.compareAndSet(state,OutputState.ERROR))
150                         _writeListener.onError(_onError==null?new EofException("Async close"):_onError);
151                     continue;
152                     
153                 default:
154                     if (_state.compareAndSet(state,OutputState.CLOSED))
155                     {
156                         try
157                         {
158                             write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER,!_channel.getResponse().isIncluding());
159                         }
160                         catch(IOException e)
161                         {
162                             LOG.debug(e);
163                             _channel.failed();
164                         }
165                         releaseBuffer();
166                         return;
167                     }
168             }
169         }
170     }
171
172     /* Called to indicated that the output is already closed (write with last==true performed) and the state needs to be updated to match */
173     void closed()
174     {
175         loop: while(true)
176         {
177             OutputState state=_state.get();
178             switch (state)
179             {
180                 case CLOSED:
181                     break loop;
182                     
183                 case UNREADY:
184                     if (_state.compareAndSet(state,OutputState.ERROR))
185                         _writeListener.onError(_onError==null?new EofException("Async closed"):_onError);
186                     continue;
187                     
188                 default:
189                     if (_state.compareAndSet(state,OutputState.CLOSED))
190                     {
191                         try
192                         {
193                             _channel.getResponse().closeOutput();
194                         }
195                         catch(IOException e)
196                         {
197                             LOG.debug(e);
198                             _channel.failed();
199                         }
200                         releaseBuffer();
201                         return;
202                     }
203             }
204         }
205     }
206
207     private void releaseBuffer()
208     {
209         if (_aggregate != null)
210         {
211             _channel.getConnector().getByteBufferPool().release(_aggregate);
212             _aggregate = null;
213         }
214     }
215
216     public boolean isClosed()
217     {
218         return _state.get()==OutputState.CLOSED;
219     }
220
221     @Override
222     public void flush() throws IOException
223     {
224         while(true)
225         {
226             switch(_state.get())
227             {
228                 case OPEN:
229                     write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false);
230                     return;
231
232                 case ASYNC:
233                     throw new IllegalStateException("isReady() not called");
234
235                 case READY:
236                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
237                         continue;
238                     new AsyncFlush().iterate();
239                     return;
240
241                 case PENDING:
242                 case UNREADY:
243                     throw new WritePendingException();
244
245                 case ERROR:
246                     throw new EofException(_onError);
247                     
248                 case CLOSED:
249                     return;
250             }
251             break;
252         }
253     }
254
255
256     @Override
257     public void write(byte[] b, int off, int len) throws IOException
258     {
259         _written+=len;
260         boolean complete=_channel.getResponse().isAllContentWritten(_written);
261
262         // Async or Blocking ?
263         while(true)
264         {
265             switch(_state.get())
266             {
267                 case OPEN:
268                     // process blocking below
269                     break;
270
271                 case ASYNC:
272                     throw new IllegalStateException("isReady() not called");
273
274                 case READY:
275                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
276                         continue;
277
278                     // Should we aggregate?
279                     if (!complete && len<=_commitSize)
280                     {
281                         if (_aggregate == null)
282                             _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
283
284                         // YES - fill the aggregate with content from the buffer
285                         int filled = BufferUtil.fill(_aggregate, b, off, len);
286
287                         // return if we are not complete, not full and filled all the content
288                         if (filled==len && !BufferUtil.isFull(_aggregate))
289                         {
290                             if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
291                                 throw new IllegalStateException();
292                             return;
293                         }
294
295                         // adjust offset/length
296                         off+=filled;
297                         len-=filled;
298                     }
299
300                     // Do the asynchronous writing from the callback
301                     new AsyncWrite(b,off,len,complete).iterate();
302                     return;
303
304                 case PENDING:
305                 case UNREADY:
306                     throw new WritePendingException();
307
308                 case ERROR:
309                     throw new EofException(_onError);
310                     
311                 case CLOSED:
312                     throw new EofException("Closed");
313             }
314             break;
315         }
316
317
318         // handle blocking write
319
320         // Should we aggregate?
321         int capacity = getBufferSize();
322         if (!complete && len<=_commitSize)
323         {
324             if (_aggregate == null)
325                 _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
326
327             // YES - fill the aggregate with content from the buffer
328             int filled = BufferUtil.fill(_aggregate, b, off, len);
329
330             // return if we are not complete, not full and filled all the content
331             if (filled==len && !BufferUtil.isFull(_aggregate))
332                 return;
333
334             // adjust offset/length
335             off+=filled;
336             len-=filled;
337         }
338
339         // flush any content from the aggregate
340         if (BufferUtil.hasContent(_aggregate))
341         {
342             write(_aggregate, complete && len==0);
343
344             // should we fill aggregate again from the buffer?
345             if (len>0 && !complete && len<=_commitSize)
346             {
347                 BufferUtil.append(_aggregate, b, off, len);
348                 return;
349             }
350         }
351
352         // write any remaining content in the buffer directly
353         if (len>0)
354         {
355             ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
356             ByteBuffer view = wrap.duplicate();
357
358             // write a buffer capacity at a time to avoid JVM pooling large direct buffers
359             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
360             while (len>getBufferSize())
361             {
362                 int p=view.position();
363                 int l=p+getBufferSize();
364                 view.limit(p+getBufferSize());
365                 write(view,false);
366                 len-=getBufferSize();
367                 view.limit(l+Math.min(len,getBufferSize()));
368                 view.position(l);
369             }
370             write(view,complete);
371         }
372         else if (complete)
373             write(BufferUtil.EMPTY_BUFFER,complete);
374
375         if (complete)
376             closed();
377
378     }
379
380     public void write(ByteBuffer buffer) throws IOException
381     {
382         _written+=buffer.remaining();
383         boolean complete=_channel.getResponse().isAllContentWritten(_written);
384
385         // Async or Blocking ?
386         while(true)
387         {
388             switch(_state.get())
389             {
390                 case OPEN:
391                     // process blocking below
392                     break;
393
394                 case ASYNC:
395                     throw new IllegalStateException("isReady() not called");
396
397                 case READY:
398                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
399                         continue;
400
401                     // Do the asynchronous writing from the callback
402                     new AsyncWrite(buffer,complete).iterate();
403                     return;
404
405                 case PENDING:
406                 case UNREADY:
407                     throw new WritePendingException();
408
409                 case ERROR:
410                     throw new EofException(_onError);
411                     
412                 case CLOSED:
413                     throw new EofException("Closed");
414             }
415             break;
416         }
417
418
419         // handle blocking write
420         int len=BufferUtil.length(buffer);
421
422         // flush any content from the aggregate
423         if (BufferUtil.hasContent(_aggregate))
424             write(_aggregate, complete && len==0);
425
426         // write any remaining content in the buffer directly
427         if (len>0)
428             write(buffer, complete);
429         else if (complete)
430             write(BufferUtil.EMPTY_BUFFER,complete);
431
432         if (complete)
433             closed();
434     }
435
436     @Override
437     public void write(int b) throws IOException
438     {
439         _written+=1;
440         boolean complete=_channel.getResponse().isAllContentWritten(_written);
441
442         // Async or Blocking ?
443         while(true)
444         {
445             switch(_state.get())
446             {
447                 case OPEN:
448                     if (_aggregate == null)
449                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
450                     BufferUtil.append(_aggregate, (byte)b);
451
452                     // Check if all written or full
453                     if (complete || BufferUtil.isFull(_aggregate))
454                     {
455                         try(Blocker blocker=_writeblock.acquire())
456                         {
457                             write(_aggregate, complete, blocker);
458                             blocker.block();
459                         }
460                         if (complete)
461                             closed();
462                     }
463                     break;
464
465                 case ASYNC:
466                     throw new IllegalStateException("isReady() not called");
467
468                 case READY:
469                     if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
470                         continue;
471
472                     if (_aggregate == null)
473                         _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
474                     BufferUtil.append(_aggregate, (byte)b);
475
476                     // Check if all written or full
477                     if (!complete && !BufferUtil.isFull(_aggregate))
478                     {
479                         if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
480                             throw new IllegalStateException();
481                         return;
482                     }
483
484                     // Do the asynchronous writing from the callback
485                     new AsyncFlush().iterate();
486                     return;
487
488                 case PENDING:
489                 case UNREADY:
490                     throw new WritePendingException();
491
492                 case ERROR:
493                     throw new EofException(_onError);
494                     
495                 case CLOSED:
496                     throw new EofException("Closed");
497             }
498             break;
499         }
500     }
501
502     @Override
503     public void print(String s) throws IOException
504     {
505         if (isClosed())
506             throw new IOException("Closed");
507
508         write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
509     }
510
511     /* ------------------------------------------------------------ */
512     /** Blocking send of content.
513      * @param content The content to send.
514      * @throws IOException
515      */
516     public void sendContent(ByteBuffer content) throws IOException
517     {
518         try(Blocker blocker=_writeblock.acquire())
519         {
520             write(content,true,blocker);
521             blocker.block();
522         }
523     }
524
525     /* ------------------------------------------------------------ */
526     /** Blocking send of content.
527      * @param in The content to send
528      * @throws IOException
529      */
530     public void sendContent(InputStream in) throws IOException
531     {
532         try(Blocker blocker=_writeblock.acquire())
533         {
534             new InputStreamWritingCB(in,blocker).iterate();
535             blocker.block();
536         }
537     }
538
539     /* ------------------------------------------------------------ */
540     /** Blocking send of content.
541      * @param in The content to send
542      * @throws IOException
543      */
544     public void sendContent(ReadableByteChannel in) throws IOException
545     {
546         try(Blocker blocker=_writeblock.acquire())
547         {
548             new ReadableByteChannelWritingCB(in,blocker).iterate();
549             blocker.block();
550         }
551     }
552
553
554     /* ------------------------------------------------------------ */
555     /** Blocking send of content.
556      * @param content The content to send
557      * @throws IOException
558      */
559     public void sendContent(HttpContent content) throws IOException
560     {
561         try(Blocker blocker=_writeblock.acquire())
562         {
563             sendContent(content,blocker);
564             blocker.block();
565         }
566     }
567
568     /* ------------------------------------------------------------ */
569     /** Asynchronous send of content.
570      * @param content The content to send
571      * @param callback The callback to use to notify success or failure
572      */
573     public void sendContent(ByteBuffer content, final Callback callback)
574     {
575         write(content,true,new Callback()
576         {
577             @Override
578             public void succeeded()
579             {
580                 closed();
581                 callback.succeeded();
582             }
583
584             @Override
585             public void failed(Throwable x)
586             {
587                 callback.failed(x);
588             }
589         });
590     }
591
592     /* ------------------------------------------------------------ */
593     /** Asynchronous send of content.
594      * @param in The content to send as a stream.  The stream will be closed
595      * after reading all content.
596      * @param callback The callback to use to notify success or failure
597      */
598     public void sendContent(InputStream in, Callback callback)
599     {
600         new InputStreamWritingCB(in,callback).iterate();
601     }
602
603     /* ------------------------------------------------------------ */
604     /** Asynchronous send of content.
605      * @param in The content to send as a channel.  The channel will be closed
606      * after reading all content.
607      * @param callback The callback to use to notify success or failure
608      */
609     public void sendContent(ReadableByteChannel in, Callback callback)
610     {
611         new ReadableByteChannelWritingCB(in,callback).iterate();
612     }
613
614     /* ------------------------------------------------------------ */
615     /** Asynchronous send of content.
616      * @param httpContent The content to send
617      * @param callback The callback to use to notify success or failure
618      */
619     public void sendContent(HttpContent httpContent, Callback callback) throws IOException
620     {
621         if (BufferUtil.hasContent(_aggregate))
622             throw new IOException("written");
623         if (_channel.isCommitted())
624             throw new IOException("committed");
625
626         while (true)
627         {
628             switch(_state.get())
629             {
630                 case OPEN:
631                     if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
632                         continue;
633                     break;
634                 case ERROR:
635                     throw new EofException(_onError);
636                 case CLOSED:
637                     throw new EofException("Closed");
638                 default:
639                     throw new IllegalStateException();
640             }
641             break;
642         }
643         ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
644         if (buffer == null)
645             buffer = httpContent.getIndirectBuffer();
646
647         if (buffer!=null)
648         {
649             sendContent(buffer,callback);
650             return;
651         }
652
653         ReadableByteChannel rbc=httpContent.getReadableByteChannel();
654         if (rbc!=null)
655         {
656             // Close of the rbc is done by the async sendContent
657             sendContent(rbc,callback);
658             return;
659         }
660
661         InputStream in = httpContent.getInputStream();
662         if ( in!=null )
663         {
664             sendContent(in,callback);
665             return;
666         }
667
668         callback.failed(new IllegalArgumentException("unknown content for "+httpContent));
669     }
670
671     public int getBufferSize()
672     {
673         return _bufferSize;
674     }
675
676     public void setBufferSize(int size)
677     {
678         _bufferSize = size;
679         _commitSize = size;
680     }
681
682     public void resetBuffer()
683     {
684         if (BufferUtil.hasContent(_aggregate))
685             BufferUtil.clear(_aggregate);
686     }
687
688     @Override
689     public void setWriteListener(WriteListener writeListener)
690     {
691         if (!_channel.getState().isAsync())
692             throw new IllegalStateException("!ASYNC");
693
694         if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
695         {
696             _writeListener = writeListener;
697             _channel.getState().onWritePossible();
698         }
699         else
700             throw new IllegalStateException();
701     }
702
703     /**
704      * @see javax.servlet.ServletOutputStream#isReady()
705      */
706     @Override
707     public boolean isReady()
708     {
709         while (true)
710         {
711             switch(_state.get())
712             {
713                 case OPEN:
714                     return true;
715                 case ASYNC:
716                     if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
717                         continue;
718                     return true;
719                 case READY:
720                     return true;
721                 case PENDING:
722                     if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
723                         continue;
724                     return false;
725                 case UNREADY:
726                     return false;
727
728                 case ERROR:
729                     return true;
730                     
731                 case CLOSED:
732                     return true;
733             }
734         }
735     }
736
737     @Override
738     public void run()
739     {
740         loop: while (true)
741         {
742             OutputState state = _state.get();
743
744             if(_onError!=null)
745             {
746                 switch(state)
747                 {
748                     case CLOSED:
749                     case ERROR:
750                         _onError=null;
751                         break loop;
752
753                     default:
754                         if (_state.compareAndSet(state, OutputState.ERROR))
755                         {
756                             Throwable th=_onError;
757                             _onError=null;
758                             LOG.debug("onError",th);
759                             _writeListener.onError(th);
760                             close();
761
762                             break loop;
763                         }
764
765                 }
766                 continue loop;
767             }
768
769             switch(_state.get())
770             {
771                 case READY:
772                 case CLOSED:
773                     // even though a write is not possible, because a close has 
774                     // occurred, we need to call onWritePossible to tell async
775                     // producer that the last write completed.
776                     try
777                     {
778                         _writeListener.onWritePossible();
779                         break loop;
780                     }
781                     catch (Throwable e)
782                     {
783                         _onError=e;
784                     }
785                     break;
786                 default:
787
788             }
789         }
790     }
791     
792     @Override
793     public String toString()
794     {
795         return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get());
796     }
797     
798     private abstract class AsyncICB extends IteratingCallback
799     {
800         @Override
801         protected void completed()
802         {
803             while(true)
804             {
805                 OutputState last=_state.get();
806                 switch(last)
807                 {
808                     case PENDING:
809                         if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
810                             continue;
811                         break;
812
813                     case UNREADY:
814                         if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
815                             continue;
816                         _channel.getState().onWritePossible();
817                         break;
818
819                     case CLOSED:
820                         break;
821
822                     default:
823                         throw new IllegalStateException();
824                 }
825                 break;
826             }
827         }
828
829         @Override
830         public void failed(Throwable e)
831         {
832             super.failed(e);
833             _onError=e;
834             _channel.getState().onWritePossible();
835         }
836     }
837     
838     
839     private class AsyncFlush extends AsyncICB
840     {
841         protected volatile boolean _flushed;
842
843         public AsyncFlush()
844         {
845         }
846
847         @Override
848         protected Action process()
849         {
850             if (BufferUtil.hasContent(_aggregate))
851             {
852                 _flushed=true;
853                 write(_aggregate, false, this);
854                 return Action.SCHEDULED;
855             }
856
857             if (!_flushed)
858             {
859                 _flushed=true;
860                 write(BufferUtil.EMPTY_BUFFER,false,this);
861                 return Action.SCHEDULED;
862             }
863
864             return Action.SUCCEEDED;
865         }
866     }
867
868
869
870     private class AsyncWrite extends AsyncICB
871     {
872         private final ByteBuffer _buffer;
873         private final ByteBuffer _slice;
874         private final boolean _complete;
875         private final int _len;
876         protected volatile boolean _completed;
877
878         public AsyncWrite(byte[] b, int off, int len, boolean complete)
879         {
880             _buffer=ByteBuffer.wrap(b, off, len);
881             _len=len;
882             // always use a view for large byte arrays to avoid JVM pooling large direct buffers
883             _slice=_len<getBufferSize()?null:_buffer.duplicate();
884             _complete=complete;
885         }
886
887         public AsyncWrite(ByteBuffer buffer, boolean complete)
888         {
889             _buffer=buffer;
890             _len=buffer.remaining();
891             // Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
892             _slice=_buffer.isDirect()||_len<getBufferSize()?null:_buffer.duplicate();
893             _complete=complete;
894         }
895
896         @Override
897         protected Action process()
898         {
899             // flush any content from the aggregate
900             if (BufferUtil.hasContent(_aggregate))
901             {
902                 _completed=_len==0;
903                 write(_aggregate, _complete && _completed, this);
904                 return Action.SCHEDULED;
905             }
906
907             // Can we just aggregate the remainder?
908             if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
909             {
910                 int position = BufferUtil.flipToFill(_aggregate);
911                 BufferUtil.put(_buffer,_aggregate);
912                 BufferUtil.flipToFlush(_aggregate, position);
913                 return Action.SUCCEEDED;
914             }
915             
916             // Is there data left to write?
917             if (_buffer.hasRemaining())
918             {
919                 // if there is no slice, just write it
920                 if (_slice==null)
921                 {
922                     _completed=true;
923                     write(_buffer, _complete, this);
924                     return Action.SCHEDULED;
925                 }
926                 
927                 // otherwise take a slice
928                 int p=_buffer.position();
929                 int l=Math.min(getBufferSize(),_buffer.remaining());
930                 int pl=p+l;
931                 _slice.limit(pl);
932                 _buffer.position(pl);
933                 _slice.position(p);
934                 _completed=!_buffer.hasRemaining();
935                 write(_slice, _complete && _completed, this);
936                 return Action.SCHEDULED;
937             }
938             
939             // all content written, but if we have not yet signal completion, we
940             // need to do so
941             if (_complete && !_completed)
942             {
943                 _completed=true;
944                 write(BufferUtil.EMPTY_BUFFER, _complete, this);
945                 return Action.SCHEDULED;
946             }
947
948             return Action.SUCCEEDED;
949         }
950
951         @Override
952         protected void completed()
953         {
954             super.completed();
955             if (_complete)
956                 closed();
957         }
958         
959         
960     }
961
962
963     /* ------------------------------------------------------------ */
964     /** An iterating callback that will take content from an
965      * InputStream and write it to the associated {@link HttpChannel}.
966      * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
967      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
968      * be notified as each buffer is written and only once all the input is consumed will the
969      * wrapped {@link Callback#succeeded()} method be called.
970      */
971     private class InputStreamWritingCB extends IteratingNestedCallback
972     {
973         private final InputStream _in;
974         private final ByteBuffer _buffer;
975         private boolean _eof;
976
977         public InputStreamWritingCB(InputStream in, Callback callback)
978         {
979             super(callback);
980             _in=in;
981             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
982         }
983
984         @Override
985         protected Action process() throws Exception
986         {
987             // Only return if EOF has previously been read and thus
988             // a write done with EOF=true
989             if (_eof)
990             {
991                 // Handle EOF
992                 _in.close();
993                 closed();
994                 _channel.getByteBufferPool().release(_buffer);
995                 return Action.SUCCEEDED;
996             }
997             
998             // Read until buffer full or EOF
999             int len=0;
1000             while (len<_buffer.capacity() && !_eof)
1001             {
1002                 int r=_in.read(_buffer.array(),_buffer.arrayOffset()+len,_buffer.capacity()-len);
1003                 if (r<0)
1004                     _eof=true;
1005                 else
1006                     len+=r;
1007             }
1008
1009             // write what we have
1010             _buffer.position(0);
1011             _buffer.limit(len);
1012             write(_buffer,_eof,this);
1013             return Action.SCHEDULED;
1014         }
1015
1016         @Override
1017         public void failed(Throwable x)
1018         {
1019             super.failed(x);
1020             _channel.getByteBufferPool().release(_buffer);
1021             try
1022             {
1023                 _in.close();
1024             }
1025             catch (IOException e)
1026             {
1027                 LOG.ignore(e);
1028             }
1029         }
1030
1031     }
1032
1033     /* ------------------------------------------------------------ */
1034     /** An iterating callback that will take content from a
1035      * ReadableByteChannel and write it to the {@link HttpChannel}.
1036      * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
1037      * {@link HttpChannel#useDirectBuffers()} is true.
1038      * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
1039      * be notified as each buffer is written and only once all the input is consumed will the
1040      * wrapped {@link Callback#succeeded()} method be called.
1041      */
1042     private class ReadableByteChannelWritingCB extends IteratingNestedCallback
1043     {
1044         private final ReadableByteChannel _in;
1045         private final ByteBuffer _buffer;
1046         private boolean _eof;
1047
1048         public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
1049         {
1050             super(callback);
1051             _in=in;
1052             _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
1053         }
1054
1055         @Override
1056         protected Action process() throws Exception
1057         {
1058             // Only return if EOF has previously been read and thus
1059             // a write done with EOF=true
1060             if (_eof)
1061             {
1062                 _in.close();
1063                 closed();
1064                 _channel.getByteBufferPool().release(_buffer);
1065                 return Action.SUCCEEDED;
1066             }
1067             
1068             // Read from stream until buffer full or EOF
1069             _buffer.clear();
1070             while (_buffer.hasRemaining() && !_eof)
1071               _eof = (_in.read(_buffer)) <  0;
1072
1073             // write what we have
1074             _buffer.flip();
1075             write(_buffer,_eof,this);
1076
1077             return Action.SCHEDULED;
1078         }
1079
1080         @Override
1081         public void failed(Throwable x)
1082         {
1083             super.failed(x);
1084             _channel.getByteBufferPool().release(_buffer);
1085             try
1086             {
1087                 _in.close();
1088             }
1089             catch (IOException e)
1090             {
1091                 LOG.ignore(e);
1092             }
1093         }
1094     }
1095
1096 }