]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/server/QueuedHttpInput.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / server / QueuedHttpInput.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23
24 import org.eclipse.jetty.util.ArrayQueue;
25 import org.eclipse.jetty.util.log.Log;
26 import org.eclipse.jetty.util.log.Logger;
27
28 /**
29  * {@link QueuedHttpInput} holds a queue of items passed to it by calls to {@link #content(Object)}.
30  * <p/>
31  * {@link QueuedHttpInput} stores the items directly; if the items contain byte buffers, it does not copy them
32  * but simply holds references to the item, thus the caller must organize for those buffers to valid while
33  * held by this class.
34  * <p/>
35  * To assist the caller, subclasses may override methods {@link #onAsyncRead()}, {@link #onContentConsumed(Object)}
36  * that can be implemented so that the caller will know when buffers are queued and consumed.
37  */
38 public abstract class QueuedHttpInput<T> extends HttpInput<T>
39 {
40     private final static Logger LOG = Log.getLogger(QueuedHttpInput.class);
41
42     private final ArrayQueue<T> _inputQ = new ArrayQueue<>(lock());
43
44     public QueuedHttpInput()
45     {
46     }
47
48     public void content(T item)
49     {
50         // The buffer is not copied here.  This relies on the caller not recycling the buffer
51         // until the it is consumed.  The onContentConsumed and onAllContentConsumed() callbacks are
52         // the signals to the caller that the buffers can be recycled.
53
54         synchronized (lock())
55         {
56             boolean wasEmpty = _inputQ.isEmpty();
57             _inputQ.add(item);
58             LOG.debug("{} queued {}", this, item);
59             if (wasEmpty)
60             {
61                 if (!onAsyncRead())
62                     lock().notify();
63             }
64         }
65     }
66
67     public void recycle()
68     {
69         synchronized (lock())
70         {
71             T item = _inputQ.pollUnsafe();
72             while (item != null)
73             {
74                 onContentConsumed(item);
75                 item = _inputQ.pollUnsafe();
76             }
77             super.recycle();
78         }
79     }
80
81     @Override
82     protected T nextContent()
83     {
84         synchronized (lock())
85         {
86             // Items are removed only when they are fully consumed.
87             T item = _inputQ.peekUnsafe();
88             // Skip consumed items at the head of the queue.
89             while (item != null && remaining(item) == 0)
90             {
91                 _inputQ.pollUnsafe();
92                 onContentConsumed(item);
93                 LOG.debug("{} consumed {}", this, item);
94                 item = _inputQ.peekUnsafe();
95             }
96             return item;
97         }
98     }
99
100     protected void blockForContent() throws IOException
101     {
102         synchronized (lock())
103         {
104             while (_inputQ.isEmpty() && !isFinished() && !isEOF())
105             {
106                 try
107                 {
108                     LOG.debug("{} waiting for content", this);
109                     lock().wait();
110                 }
111                 catch (InterruptedException e)
112                 {
113                     throw (IOException)new InterruptedIOException().initCause(e);
114                 }
115             }
116         }
117     }
118
119     /**
120      * Callback that signals that the given content has been consumed.
121      *
122      * @param item the consumed content
123      */
124     protected abstract void onContentConsumed(T item);
125
126     public void earlyEOF()
127     {
128         synchronized (lock())
129         {
130             super.earlyEOF();
131             lock().notify();
132         }
133     }
134
135     public void messageComplete()
136     {
137         synchronized (lock())
138         {
139             super.messageComplete();
140             lock().notify();
141         }
142     }
143 }