]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/server/LocalConnector.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / server / LocalConnector.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.charset.StandardCharsets;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29
30 import org.eclipse.jetty.io.ByteArrayEndPoint;
31 import org.eclipse.jetty.io.ByteBufferPool;
32 import org.eclipse.jetty.io.Connection;
33 import org.eclipse.jetty.util.BufferUtil;
34 import org.eclipse.jetty.util.ssl.SslContextFactory;
35 import org.eclipse.jetty.util.thread.Scheduler;
36
37 public class LocalConnector extends AbstractConnector
38 {
39     private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
40
41
42     public LocalConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories)
43     {
44         super(server,executor,scheduler,pool,acceptors,factories);
45         setIdleTimeout(30000);
46     }
47
48     public LocalConnector(Server server)
49     {
50         this(server, null, null, null, -1, new HttpConnectionFactory());
51     }
52
53     public LocalConnector(Server server, SslContextFactory sslContextFactory)
54     {
55         this(server, null, null, null, -1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
56     }
57
58     public LocalConnector(Server server, ConnectionFactory connectionFactory)
59     {
60         this(server, null, null, null, -1, connectionFactory);
61     }
62
63     public LocalConnector(Server server, ConnectionFactory connectionFactory, SslContextFactory sslContextFactory)
64     {
65         this(server, null, null, null, -1, AbstractConnectionFactory.getFactories(sslContextFactory,connectionFactory));
66     }
67
68     @Override
69     public Object getTransport()
70     {
71         return this;
72     }
73
74     /** Sends requests and get responses based on thread activity.
75      * Returns all the responses received once the thread activity has
76      * returned to the level it was before the requests.
77      * <p>
78      * This methods waits until the connection is closed or
79      * is idle for 1s before returning the responses.
80      * @param requests the requests
81      * @return the responses
82      * @throws Exception if the requests fail
83      */
84     public String getResponses(String requests) throws Exception
85     {
86         return getResponses(requests, 5, TimeUnit.SECONDS);
87     }
88
89     /** Sends requests and get responses based on thread activity.
90      * Returns all the responses received once the thread activity has
91      * returned to the level it was before the requests.
92      * <p>
93      * This methods waits until the connection is closed or
94      * an idle period before returning the responses.
95      * @param requests the requests
96      * @param idleFor The time the response stream must be idle for before returning
97      * @param units The units of idleFor
98      * @return the responses
99      * @throws Exception if the requests fail
100      */
101     public String getResponses(String requests,long idleFor,TimeUnit units) throws Exception
102     {
103         ByteBuffer result = getResponses(BufferUtil.toBuffer(requests,StandardCharsets.UTF_8),idleFor,units);
104         return result==null?null:BufferUtil.toString(result,StandardCharsets.UTF_8);
105     }
106
107     /** Sends requests and get's responses based on thread activity.
108      * Returns all the responses received once the thread activity has
109      * returned to the level it was before the requests.
110      * <p>
111      * This methods waits until the connection is closed or
112      * is idle for 1s before returning the responses.
113      * @param requestsBuffer the requests
114      * @return the responses
115      * @throws Exception if the requests fail
116      */
117     public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
118     {
119         return getResponses(requestsBuffer, 5, TimeUnit.SECONDS);
120     }
121
122     /** Sends requests and get's responses based on thread activity.
123      * Returns all the responses received once the thread activity has
124      * returned to the level it was before the requests.
125      * <p>
126      * This methods waits until the connection is closed or
127      * an idle period before returning the responses.
128      * @param requestsBuffer the requests
129      * @param idleFor The time the response stream must be idle for before returning
130      * @param units The units of idleFor
131      * @return the responses
132      * @throws Exception if the requests fail
133      */
134     public ByteBuffer getResponses(ByteBuffer requestsBuffer,long idleFor,TimeUnit units) throws Exception
135     {
136         LOG.debug("requests {}", BufferUtil.toUTF8String(requestsBuffer));
137         LocalEndPoint endp = executeRequest(requestsBuffer);
138         endp.waitUntilClosedOrIdleFor(idleFor,units);
139         ByteBuffer responses = endp.takeOutput();
140         endp.getConnection().close();
141         LOG.debug("responses {}", BufferUtil.toUTF8String(responses));
142         return responses;
143     }
144
145     /**
146      * Execute a request and return the EndPoint through which
147      * responses can be received.
148      * @param rawRequest the request
149      * @return the local endpoint
150      */
151     public LocalEndPoint executeRequest(String rawRequest)
152     {
153         return executeRequest(BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8));
154     }
155
156     private LocalEndPoint executeRequest(ByteBuffer rawRequest)
157     {
158         LocalEndPoint endp = new LocalEndPoint();
159         endp.setInput(rawRequest);
160         _connects.add(endp);
161         return endp;
162     }
163
164     @Override
165     protected void accept(int acceptorID) throws IOException, InterruptedException
166     {
167         LOG.debug("accepting {}", acceptorID);
168         LocalEndPoint endPoint = _connects.take();
169         endPoint.onOpen();
170         onEndPointOpened(endPoint);
171
172         Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
173         endPoint.setConnection(connection);
174
175         connection.onOpen();
176     }
177
178     public class LocalEndPoint extends ByteArrayEndPoint
179     {
180         private final CountDownLatch _closed = new CountDownLatch(1);
181
182         public LocalEndPoint()
183         {
184             super(getScheduler(), LocalConnector.this.getIdleTimeout());
185             setGrowOutput(true);
186         }
187
188         public void addInput(String s)
189         {
190             // TODO this is a busy wait
191             while(getIn()==null || BufferUtil.hasContent(getIn()))
192                 Thread.yield();
193             setInput(BufferUtil.toBuffer(s, StandardCharsets.UTF_8));
194         }
195
196         @Override
197         public void close()
198         {
199             boolean wasOpen=isOpen();
200             super.close();
201             if (wasOpen)
202             {
203 //                connectionClosed(getConnection());
204                 getConnection().onClose();
205                 onClose();
206             }
207         }
208
209         @Override
210         public void onClose()
211         {
212             LocalConnector.this.onEndPointClosed(this);
213             super.onClose();
214             _closed.countDown();
215         }
216
217         @Override
218         public void shutdownOutput()
219         {
220             super.shutdownOutput();
221             close();
222         }
223
224         public void waitUntilClosed()
225         {
226             while (isOpen())
227             {
228                 try
229                 {
230                     if (!_closed.await(10,TimeUnit.SECONDS))
231                         break;
232                 }
233                 catch(Exception e)
234                 {
235                     LOG.warn(e);
236                 }
237             }
238         }
239
240         public void waitUntilClosedOrIdleFor(long idleFor,TimeUnit units)
241         {
242             Thread.yield();
243             int size=getOutput().remaining();
244             while (isOpen())
245             {
246                 try
247                 {
248                     if (!_closed.await(idleFor,units))
249                     {
250                         if (size==getOutput().remaining())
251                         {
252                             LOG.debug("idle for {} {}",idleFor,units);
253                             return;
254                         }
255                         size=getOutput().remaining();
256                     }
257                 }
258                 catch(Exception e)
259                 {
260                     LOG.warn(e);
261                 }
262             }
263         }
264     }
265 }