]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/io/WriteFlusher.java
Merge "Update notes about password security"
[gigi.git] / lib / jetty / org / eclipse / jetty / io / WriteFlusher.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.io;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.WritePendingException;
25 import java.util.Arrays;
26 import java.util.EnumMap;
27 import java.util.EnumSet;
28 import java.util.Set;
29 import java.util.concurrent.atomic.AtomicReference;
30
31 import org.eclipse.jetty.util.BufferUtil;
32 import org.eclipse.jetty.util.Callback;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35
36
37 /**
38  * A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling
39  * {@link EndPoint#flush(ByteBuffer...)} until all content is written.
40  * The abstract method {@link #onIncompleteFlushed()} is called when not all content has been written after a call to
41  * flush and should organise for the {@link #completeWrite()} method to be called when a subsequent call to flush
42  * should  be able to make more progress.
43  * <p>
44  */
45 abstract public class WriteFlusher
46 {
47     private static final Logger LOG = Log.getLogger(WriteFlusher.class);
48     private static final boolean DEBUG = LOG.isDebugEnabled(); // Easy for the compiler to remove the code if DEBUG==false
49     private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[]{BufferUtil.EMPTY_BUFFER};
50     private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
51     private static final State __IDLE = new IdleState();
52     private static final State __WRITING = new WritingState();
53     private static final State __COMPLETING = new CompletingState();
54     private final EndPoint _endPoint;
55     private final AtomicReference<State> _state = new AtomicReference<>();
56
57     static
58     {
59         // fill the state machine
60         __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
61         __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
62         __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
63         __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
64         __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
65     }
66
67     // A write operation may either complete immediately:
68     //     IDLE-->WRITING-->IDLE
69     // Or it may not completely flush and go via the PENDING state
70     //     IDLE-->WRITING-->PENDING-->COMPLETING-->IDLE
71     // Or it may take several cycles to complete
72     //     IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE
73     //
74     // If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure.
75     // If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions,
76     // the callback's complete or respectively failed methods will be called.
77     // If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state
78     //
79     //   IDLE--(fail)-->IDLE
80     //   IDLE-->WRITING--(fail)-->FAILED-->IDLE
81     //   IDLE-->WRITING-->PENDING--(fail)-->IDLE
82     //   IDLE-->WRITING-->PENDING-->COMPLETING--(fail)-->FAILED-->IDLE
83     //
84     // So a call to fail in the PENDING state will be directly handled and the state changed to IDLE
85     // A call to fail in the WRITING or COMPLETING states will just set the state to FAILED and the failure will be
86     // handled with the write or completeWrite methods try to move the state from what they thought it was.
87     //
88
89     protected WriteFlusher(EndPoint endPoint)
90     {
91         _state.set(__IDLE);
92         _endPoint = endPoint;
93     }
94
95     private enum StateType
96     {
97         IDLE,
98         WRITING,
99         PENDING,
100         COMPLETING,
101         FAILED
102     }
103
104     /**
105      * Tries to update the current state to the given new state.
106      * @param previous the expected current state
107      * @param next the desired new state
108      * @return the previous state or null if the state transition failed
109      * @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error)
110      */
111     private boolean updateState(State previous,State next)
112     {
113         if (!isTransitionAllowed(previous,next))
114             throw new IllegalStateException();
115
116         boolean updated = _state.compareAndSet(previous, next);
117         if (DEBUG)
118             LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next);
119         return updated;
120     }
121
122     private void fail(PendingState pending)
123     {
124         State current = _state.get();
125         if (current.getType()==StateType.FAILED)
126         {
127             FailedState failed=(FailedState)current;
128             if (updateState(failed,__IDLE))
129             {
130                 pending.fail(failed.getCause());
131                 return;
132             }
133         }
134         throw new IllegalStateException();
135     }
136
137     private void ignoreFail()
138     {
139         State current = _state.get();
140         while (current.getType()==StateType.FAILED)
141         {
142             if (updateState(current,__IDLE))
143                 return;
144             current = _state.get();
145         }
146     }
147
148     private boolean isTransitionAllowed(State currentState, State newState)
149     {
150         Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
151         if (!allowedNewStateTypes.contains(newState.getType()))
152         {
153             LOG.warn("{}: {} -> {} not allowed", this, currentState, newState);
154             return false;
155         }
156         return true;
157     }
158
159     /**
160      * State represents a State of WriteFlusher.
161      */
162     private static class State
163     {
164         private final StateType _type;
165
166         private State(StateType stateType)
167         {
168             _type = stateType;
169         }
170
171         public StateType getType()
172         {
173             return _type;
174         }
175
176         @Override
177         public String toString()
178         {
179             return String.format("%s", _type);
180         }
181     }
182
183     /**
184      * In IdleState WriteFlusher is idle and accepts new writes
185      */
186     private static class IdleState extends State
187     {
188         private IdleState()
189         {
190             super(StateType.IDLE);
191         }
192     }
193
194     /**
195      * In WritingState WriteFlusher is currently writing.
196      */
197     private static class WritingState extends State
198     {
199         private WritingState()
200         {
201             super(StateType.WRITING);
202         }
203     }
204
205     /**
206      * In FailedState no more operations are allowed. The current implementation will never recover from this state.
207      */
208     private static class FailedState extends State
209     {
210         private final Throwable _cause;
211         private FailedState(Throwable cause)
212         {
213             super(StateType.FAILED);
214             _cause=cause;
215         }
216
217         public Throwable getCause()
218         {
219             return _cause;
220         }
221     }
222
223     /**
224      * In CompletingState WriteFlusher is flushing buffers that have not been fully written in write(). If write()
225      * didn't flush all buffers in one go, it'll switch the State to PendingState. completeWrite() will then switch to
226      * this state and try to flush the remaining buffers.
227      */
228     private static class CompletingState extends State
229     {
230         private CompletingState()
231         {
232             super(StateType.COMPLETING);
233         }
234     }
235
236     /**
237      * In PendingState not all buffers could be written in one go. Then write() will switch to PendingState() and
238      * preserve the state by creating a new PendingState object with the given parameters.
239      */
240     private class PendingState extends State
241     {
242         private final Callback _callback;
243         private final ByteBuffer[] _buffers;
244
245         private PendingState(ByteBuffer[] buffers, Callback callback)
246         {
247             super(StateType.PENDING);
248             _buffers = buffers;
249             _callback = callback;
250         }
251
252         public ByteBuffer[] getBuffers()
253         {
254             return _buffers;
255         }
256
257         protected boolean fail(Throwable cause)
258         {
259             if (_callback!=null)
260             {
261                 _callback.failed(cause);
262                 return true;
263             }
264             return false;
265         }
266
267         protected void complete()
268         {
269             if (_callback!=null)
270                 _callback.succeeded();
271         }
272     }
273
274     /**
275      * Abstract call to be implemented by specific WriteFlushers. It should schedule a call to {@link #completeWrite()}
276      * or {@link #onFail(Throwable)} when appropriate.
277      */
278     abstract protected void onIncompleteFlushed();
279
280     /**
281      * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition
282      * fails it'll fail the callback.
283      *
284      * If not all buffers can be written in one go it creates a new <code>PendingState</code> object to preserve the state
285      * and then calls {@link #onIncompleteFlushed()}. The remaining buffers will be written in {@link #completeWrite()}.
286      *
287      * If all buffers have been written it calls callback.complete().
288      *
289      * @param callback the callback to call on either failed or complete
290      * @param buffers the buffers to flush to the endpoint
291      */
292     public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
293     {
294         if (DEBUG)
295             LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
296
297         if (!updateState(__IDLE,__WRITING))
298             throw new WritePendingException();
299
300         try
301         {
302             buffers=flush(buffers);
303
304             // if we are incomplete?
305             if (buffers!=null)
306             {
307                 if (DEBUG)
308                     LOG.debug("flushed incomplete");
309                 PendingState pending=new PendingState(buffers, callback);
310                 if (updateState(__WRITING,pending))
311                     onIncompleteFlushed();
312                 else
313                     fail(pending);
314                 return;
315             }
316
317             // If updateState didn't succeed, we don't care as our buffers have been written
318             if (!updateState(__WRITING,__IDLE))
319                 ignoreFail();
320             if (callback!=null)
321                 callback.succeeded();
322         }
323         catch (IOException e)
324         {
325             if (DEBUG)
326                 LOG.debug("write exception", e);
327             if (updateState(__WRITING,__IDLE))
328             {
329                 if (callback!=null)
330                     callback.failed(e);
331             }
332             else
333                 fail(new PendingState(buffers, callback));
334         }
335     }
336
337
338     /**
339      * Complete a write that has not completed and that called {@link #onIncompleteFlushed()} to request a call to this
340      * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress.
341      *
342      * It tries to switch from PENDING to COMPLETING. If state transition fails, then it does nothing as the callback
343      * should have been already failed. That's because the only way to switch from PENDING outside this method is
344      * {@link #onFail(Throwable)} or {@link #onClose()}
345      */
346     public void completeWrite()
347     {         
348         if (DEBUG)
349             LOG.debug("completeWrite: {}", this);
350
351         State previous = _state.get();
352
353         if (previous.getType()!=StateType.PENDING)
354             return; // failure already handled.
355
356         PendingState pending = (PendingState)previous;
357         if (!updateState(pending,__COMPLETING))
358             return; // failure already handled.
359
360         try
361         {
362             ByteBuffer[] buffers = pending.getBuffers();
363
364             buffers=flush(buffers);
365
366             // if we are incomplete?
367             if (buffers!=null)
368             {
369                 if (DEBUG)
370                     LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
371                 if (buffers!=pending.getBuffers())
372                     pending=new PendingState(buffers, pending._callback);
373                 if (updateState(__COMPLETING,pending))
374                     onIncompleteFlushed();
375                 else
376                     fail(pending);
377                 return;
378             }
379
380             // If updateState didn't succeed, we don't care as our buffers have been written
381             if (!updateState(__COMPLETING,__IDLE))
382                 ignoreFail();
383             pending.complete();
384         }
385         catch (IOException e)
386         {
387             if (DEBUG)
388                 LOG.debug("completeWrite exception", e);
389             if(updateState(__COMPLETING,__IDLE))
390                 pending.fail(e);
391             else
392                 fail(pending);
393         }
394     }
395
396     /* ------------------------------------------------------------ */
397     /** Flush the buffers iteratively until no progress is made
398      * @param buffers The buffers to flush
399      * @return The unflushed buffers, or null if all flushed
400      * @throws IOException
401      */
402     protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
403     {
404         boolean progress=true;
405         while(progress && buffers!=null)
406         {
407             int before=buffers.length==0?0:buffers[0].remaining();
408             boolean flushed=_endPoint.flush(buffers);
409             int r=buffers.length==0?0:buffers[0].remaining();
410             
411             if (flushed)
412                 return null;
413             
414             progress=before!=r;
415             
416             int not_empty=0;
417             while(r==0)
418             {
419                 if (++not_empty==buffers.length)
420                 {
421                     buffers=null;
422                     not_empty=0;
423                     break;
424                 }
425                 progress=true;
426                 r=buffers[not_empty].remaining();
427             }
428
429             if (not_empty>0)
430                 buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
431         }        
432         
433         // If buffers is null, then flush has returned false but has consumed all the data!
434         // This is probably SSL being unable to flush the encrypted buffer, so return EMPTY_BUFFERS
435         // and that will keep this WriteFlusher pending.
436         return buffers==null?EMPTY_BUFFERS:buffers;
437     }
438     
439     /* ------------------------------------------------------------ */
440     /** Notify the flusher of a failure
441      * @param cause The cause of the failure
442      * @return true if the flusher passed the failure to a {@link Callback} instance
443      */
444     public boolean onFail(Throwable cause)
445     {
446         // Keep trying to handle the failure until we get to IDLE or FAILED state
447         while(true)
448         {
449             State current=_state.get();
450             switch(current.getType())
451             {
452                 case IDLE:
453                 case FAILED:
454                     if (DEBUG)
455                         LOG.debug("ignored: {} {}", this, cause);
456                     return false;
457
458                 case PENDING:
459                     if (DEBUG)
460                         LOG.debug("failed: {} {}", this, cause);
461
462                     PendingState pending = (PendingState)current;
463                     if (updateState(pending,__IDLE))
464                         return pending.fail(cause);
465                     break;
466
467                 default:
468                     if (DEBUG)
469                         LOG.debug("failed: {} {}", this, cause);
470
471                     if (updateState(current,new FailedState(cause)))
472                         return false;
473                     break;
474             }
475         }
476     }
477
478     public void onClose()
479     {
480         if (_state.get()==__IDLE)
481             return;
482         onFail(new ClosedChannelException());
483     }
484
485     boolean isIdle()
486     {
487         return _state.get().getType() == StateType.IDLE;
488     }
489
490     public boolean isInProgress()
491     {
492         switch(_state.get().getType())
493         {
494             case WRITING:
495             case PENDING:
496             case COMPLETING:
497                 return true;
498             default:
499                 return false;
500         }
501     }
502
503     @Override
504     public String toString()
505     {
506         return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
507     }
508 }