]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/io/SelectorManager.java
Merge "Update notes about password security"
[gigi.git] / lib / jetty / org / eclipse / jetty / io / SelectorManager.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.io;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.Socket;
25 import java.net.SocketAddress;
26 import java.net.SocketTimeoutException;
27 import java.nio.channels.CancelledKeyException;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.nio.channels.ServerSocketChannel;
31 import java.nio.channels.SocketChannel;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Queue;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41
42 import org.eclipse.jetty.util.ConcurrentArrayQueue;
43 import org.eclipse.jetty.util.TypeUtil;
44 import org.eclipse.jetty.util.annotation.ManagedAttribute;
45 import org.eclipse.jetty.util.component.AbstractLifeCycle;
46 import org.eclipse.jetty.util.component.ContainerLifeCycle;
47 import org.eclipse.jetty.util.component.Dumpable;
48 import org.eclipse.jetty.util.log.Log;
49 import org.eclipse.jetty.util.log.Logger;
50 import org.eclipse.jetty.util.thread.NonBlockingThread;
51 import org.eclipse.jetty.util.thread.Scheduler;
52
53 /**
54  * <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
55  * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p>
56  * <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
57  * {@link EndPoint}s and {@link Connection}s.</p>
58  */
59 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
60 {
61     public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates";
62     public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
63     protected static final Logger LOG = Log.getLogger(SelectorManager.class);
64     private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "true"));
65
66     private final Executor executor;
67     private final Scheduler scheduler;
68     private final ManagedSelector[] _selectors;
69     private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
70     private long _selectorIndex;
71     private int _priorityDelta;
72
73     protected SelectorManager(Executor executor, Scheduler scheduler)
74     {
75         this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
76     }
77
78     protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
79     {
80         if (selectors<=0)
81             throw new IllegalArgumentException("No selectors");
82         this.executor = executor;
83         this.scheduler = scheduler;
84         _selectors = new ManagedSelector[selectors];
85     }
86
87     public Executor getExecutor()
88     {
89         return executor;
90     }
91
92     public Scheduler getScheduler()
93     {
94         return scheduler;
95     }
96
97     /**
98      * Get the connect timeout
99      *
100      * @return the connect timeout (in milliseconds)
101      */
102     public long getConnectTimeout()
103     {
104         return _connectTimeout;
105     }
106
107     /**
108      * Set the connect timeout (in milliseconds)
109      *
110      * @param milliseconds the number of milliseconds for the timeout
111      */
112     public void setConnectTimeout(long milliseconds)
113     {
114         _connectTimeout = milliseconds;
115     }
116
117
118     @ManagedAttribute("The priority delta to apply to selector threads")
119     public int getSelectorPriorityDelta()
120     {
121         return _priorityDelta;
122     }
123
124     /**
125      * Sets the selector thread priority delta to the given amount.
126      * <p>This allows the selector threads to run at a different priority.
127      * Typically this would be used to lower the priority to give preference
128      * to handling previously accepted connections rather than accepting
129      * new connections.</p>
130      *
131      * @param selectorPriorityDelta the amount to change the thread priority
132      *                              delta to (may be negative)
133      * @see Thread#getPriority()
134      */
135     public void setSelectorPriorityDelta(int selectorPriorityDelta)
136     {
137         int oldDelta = _priorityDelta;
138         _priorityDelta = selectorPriorityDelta;
139         if (oldDelta != selectorPriorityDelta && isStarted())
140         {
141             for (ManagedSelector selector : _selectors)
142             {
143                 Thread thread = selector._thread;
144                 if (thread != null)
145                 {
146                     int deltaDiff = selectorPriorityDelta - oldDelta;
147                     thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, thread.getPriority() - deltaDiff)));
148                 }
149             }
150         }
151     }
152
153     /**
154      * Executes the given task in a different thread.
155      *
156      * @param task the task to execute
157      */
158     protected void execute(Runnable task)
159     {
160         executor.execute(task);
161     }
162
163     /**
164      * @return the number of selectors in use
165      */
166     public int getSelectorCount()
167     {
168         return _selectors.length;
169     }
170
171     private ManagedSelector chooseSelector()
172     {
173         // The ++ increment here is not atomic, but it does not matter,
174         // so long as the value changes sometimes, then connections will
175         // be distributed over the available selectors.
176         long s = _selectorIndex++;
177         int index = (int)(s % getSelectorCount());
178         return _selectors[index];
179     }
180
181     /**
182      * <p>Registers a channel to perform a non-blocking connect.</p>
183      * <p>The channel must be set in non-blocking mode, {@link SocketChannel#connect(SocketAddress)}
184      * must be called prior to calling this method, and the connect operation must not be completed
185      * (the return value of {@link SocketChannel#connect(SocketAddress)} must be false).</p>
186      *
187      * @param channel    the channel to register
188      * @param attachment the attachment object
189      * @see #accept(SocketChannel, Object)
190      */
191     public void connect(SocketChannel channel, Object attachment)
192     {
193         ManagedSelector set = chooseSelector();
194         set.submit(set.new Connect(channel, attachment));
195     }
196
197     /**
198      * @see #accept(SocketChannel, Object)
199      */
200     public void accept(SocketChannel channel)
201     {
202         accept(channel, null);
203     }
204
205     /**
206      * <p>Registers a channel to perform non-blocking read/write operations.</p>
207      * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
208      * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}, or
209      * just after a non-blocking connect via {@link SocketChannel#connect(SocketAddress)} that completed
210      * successfully.</p>
211      *
212      * @param channel the channel to register
213      * @param attachment the attachment object
214      */
215     public void accept(SocketChannel channel, Object attachment)
216     {
217         final ManagedSelector selector = chooseSelector();
218         selector.submit(selector.new Accept(channel, attachment));
219     }
220
221     /**
222      * <p>Registers a server channel for accept operations.
223      * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel}
224      * then the {@link #accepted(SocketChannel)} method is called, which must be
225      * overridden by a derivation of this class to handle the accepted channel
226      *
227      * @param server the server channel to register
228      */
229     public void acceptor(ServerSocketChannel server)
230     {
231         final ManagedSelector selector = chooseSelector();
232         selector.submit(selector.new Acceptor(server));
233     }
234
235     /**
236      * Callback method when a channel is accepted from the {@link ServerSocketChannel}
237      * passed to {@link #acceptor(ServerSocketChannel)}.
238      * The default impl throws an {@link UnsupportedOperationException}, so it must
239      * be overridden by subclasses if a server channel is provided.
240      *
241      * @param channel the
242      * @throws IOException
243      */
244     protected void accepted(SocketChannel channel) throws IOException
245     {
246         throw new UnsupportedOperationException();
247     }
248
249     @Override
250     protected void doStart() throws Exception
251     {
252         super.doStart();
253         for (int i = 0; i < _selectors.length; i++)
254         {
255             ManagedSelector selector = newSelector(i);
256             _selectors[i] = selector;
257             selector.start();
258             execute(new NonBlockingThread(selector));
259         }
260     }
261
262     /**
263      * <p>Factory method for {@link ManagedSelector}.</p>
264      *
265      * @param id an identifier for the {@link ManagedSelector to create}
266      * @return a new {@link ManagedSelector}
267      */
268     protected ManagedSelector newSelector(int id)
269     {
270         return new ManagedSelector(id);
271     }
272
273     @Override
274     protected void doStop() throws Exception
275     {
276         for (ManagedSelector selector : _selectors)
277             selector.stop();
278         super.doStop();
279     }
280
281     /**
282      * <p>Callback method invoked when an endpoint is opened.</p>
283      *
284      * @param endpoint the endpoint being opened
285      */
286     protected void endPointOpened(EndPoint endpoint)
287     {
288         endpoint.onOpen();
289     }
290
291     /**
292      * <p>Callback method invoked when an endpoint is closed.</p>
293      *
294      * @param endpoint the endpoint being closed
295      */
296     protected void endPointClosed(EndPoint endpoint)
297     {
298         endpoint.onClose();
299     }
300
301     /**
302      * <p>Callback method invoked when a connection is opened.</p>
303      *
304      * @param connection the connection just opened
305      */
306     public void connectionOpened(Connection connection)
307     {
308         try
309         {
310             connection.onOpen();
311         }
312         catch (Throwable x)
313         {
314             if (isRunning())
315                 LOG.warn("Exception while notifying connection " + connection, x);
316             else
317                 LOG.debug("Exception while notifying connection " + connection, x);
318             throw x;
319         }
320     }
321
322     /**
323      * <p>Callback method invoked when a connection is closed.</p>
324      *
325      * @param connection the connection just closed
326      */
327     public void connectionClosed(Connection connection)
328     {
329         try
330         {
331             connection.onClose();
332         }
333         catch (Throwable x)
334         {
335             LOG.debug("Exception while notifying connection " + connection, x);
336         }
337     }
338
339     protected boolean finishConnect(SocketChannel channel) throws IOException
340     {
341         return channel.finishConnect();
342     }
343
344     /**
345      * <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
346      * <p>By default it just logs with level warning.</p>
347      *
348      * @param channel the channel that attempted the connect
349      * @param ex the exception that caused the connect to fail
350      * @param attachment the attachment object associated at registration
351      */
352     protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
353     {
354         LOG.warn(String.format("%s - %s", channel, attachment), ex);
355     }
356
357     /**
358      * <p>Factory method to create {@link EndPoint}.</p>
359      * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
360      * or {@link #accept(SocketChannel)}.</p>
361      *
362      * @param channel   the channel associated to the endpoint
363      * @param selector the selector the channel is registered to
364      * @param selectionKey      the selection key
365      * @return a new endpoint
366      * @throws IOException if the endPoint cannot be created
367      * @see #newConnection(SocketChannel, EndPoint, Object)
368      */
369     protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
370
371     /**
372      * <p>Factory method to create {@link Connection}.</p>
373      *
374      * @param channel    the channel associated to the connection
375      * @param endpoint   the endpoint
376      * @param attachment the attachment
377      * @return a new connection
378      * @throws IOException
379      * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
380      */
381     public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
382
383     @Override
384     public String dump()
385     {
386         return ContainerLifeCycle.dump(this);
387     }
388
389     @Override
390     public void dump(Appendable out, String indent) throws IOException
391     {
392         ContainerLifeCycle.dumpObject(out, this);
393         ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
394     }
395
396     private enum State
397     {
398         CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS
399     }
400
401     /**
402      * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
403      * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
404      * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
405      * with the channel.</p>
406      */
407     public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
408     {
409         private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESS);
410         private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
411         private final int _id;
412         private Selector _selector;
413         private volatile Thread _thread;
414
415         public ManagedSelector(int id)
416         {
417             _id = id;
418             setStopTimeout(5000);
419         }
420
421         @Override
422         protected void doStart() throws Exception
423         {
424             super.doStart();
425             _selector = Selector.open();
426             _state.set(State.PROCESS);
427         }
428
429         @Override
430         protected void doStop() throws Exception
431         {
432             if (LOG.isDebugEnabled())
433                 LOG.debug("Stopping {}", this);
434             Stop stop = new Stop();
435             submit(stop);
436             stop.await(getStopTimeout());
437             if (LOG.isDebugEnabled())
438                 LOG.debug("Stopped {}", this);
439         }
440
441         /**
442          * Submit a task to update a selector key.  If the System property {@link SelectorManager#SUBMIT_KEY_UPDATES}
443          * is set true (default is false), the task is passed to {@link #submit(Runnable)}.   Otherwise it is run immediately and the selector
444          * woken up if need be.
445          * @param update the update to a key
446          */
447         public void updateKey(Runnable update)
448         {
449             if (__submitKeyUpdates)
450             {
451                 submit(update);
452             }
453             else
454             {
455                 // Run only 1 change at once
456                 synchronized (this)
457                 {
458                     runChange(update);
459                 }
460                 if (_state.compareAndSet(State.SELECT, State.WAKEUP))
461                    wakeup();
462             }
463         }
464
465         /**
466          * <p>Submits a change to be executed in the selector thread.</p>
467          * <p>Changes may be submitted from any thread, and the selector thread woken up
468          * (if necessary) to execute the change.</p>
469          *
470          * @param change the change to submit
471          */
472         public void submit(Runnable change)
473         {
474             // This method may be called from the selector thread, and therefore
475             // we could directly run the change without queueing, but this may
476             // lead to stack overflows on a busy server, so we always offer the
477             // change to the queue and process the state.
478
479             _changes.offer(change);
480             if (LOG.isDebugEnabled())
481                 LOG.debug("Queued change {}", change);
482
483             out: while (true)
484             {
485                 switch (_state.get())
486                 {
487                     case SELECT:
488                         // Avoid multiple wakeup() calls if we the CAS fails
489                         if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
490                             continue;
491                         wakeup();
492                         break out;
493                     case CHANGES:
494                         // Tell the selector thread that we have more changes.
495                         // If we fail to CAS, we possibly need to wakeup(), so loop.
496                         if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
497                             break out;
498                         continue;
499                     case WAKEUP:
500                         // Do nothing, we have already a wakeup scheduled
501                         break out;
502                     case MORE_CHANGES:
503                         // Do nothing, we already notified the selector thread of more changes
504                         break out;
505                     case PROCESS:
506                         // Do nothing, the changes will be run after the processing
507                         break out;
508                     default:
509                         throw new IllegalStateException();
510                 }
511             }
512         }
513
514         private void runChanges()
515         {
516             Runnable change;
517             while ((change = _changes.poll()) != null)
518                 runChange(change);
519         }
520
521         protected void runChange(Runnable change)
522         {
523             try
524             {
525                 if (LOG.isDebugEnabled())
526                     LOG.debug("Running change {}", change);
527                 change.run();
528             }
529             catch (Throwable x)
530             {
531                 LOG.debug("Could not run change " + change, x);
532             }
533         }
534
535         @Override
536         public void run()
537         {
538             _thread = Thread.currentThread();
539             String name = _thread.getName();
540             int priority = _thread.getPriority();
541             try
542             {
543                 if (_priorityDelta != 0)
544                     _thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _priorityDelta)));
545
546                 _thread.setName(String.format("%s-selector-%s@%h/%d", name, SelectorManager.this.getClass().getSimpleName(), SelectorManager.this.hashCode(), _id));
547                 if (LOG.isDebugEnabled())
548                     LOG.debug("Starting {} on {}", _thread, this);
549                 while (isRunning())
550                     select();
551                 while(isStopping())
552                     runChanges();
553             }
554             finally
555             {
556                 if (LOG.isDebugEnabled())
557                     LOG.debug("Stopped {} on {}", _thread, this);
558                 _thread.setName(name);
559                 if (_priorityDelta != 0)
560                     _thread.setPriority(priority);
561             }
562         }
563
564         /**
565          * <p>Process changes and waits on {@link Selector#select()}.</p>
566          *
567          * @see #submit(Runnable)
568          */
569         public void select()
570         {
571             boolean debug = LOG.isDebugEnabled();
572             try
573             {
574                 _state.set(State.CHANGES);
575
576                 // Run the changes, and only exit if we ran all changes
577                 out: while(true)
578                 {
579                     switch (_state.get())
580                     {
581                         case CHANGES:
582                             runChanges();
583                             if (_state.compareAndSet(State.CHANGES, State.SELECT))
584                                 break out;
585                             continue;
586                         case MORE_CHANGES:
587                             runChanges();
588                             _state.set(State.CHANGES);
589                             continue;
590                         default:
591                             throw new IllegalStateException();
592                     }
593                 }
594                 // Must check first for SELECT and *then* for WAKEUP
595                 // because we read the state twice in the assert, and
596                 // it could change from SELECT to WAKEUP in between.
597                 assert _state.get() == State.SELECT || _state.get() == State.WAKEUP;
598
599                 if (debug)
600                     LOG.debug("Selector loop waiting on select");
601                 int selected = _selector.select();
602                 if (debug)
603                     LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
604
605                 _state.set(State.PROCESS);
606
607                 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
608                 for (SelectionKey key : selectedKeys)
609                 {
610                     if (key.isValid())
611                     {
612                         processKey(key);
613                     }
614                     else
615                     {
616                         if (debug)
617                             LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
618                         Object attachment = key.attachment();
619                         if (attachment instanceof EndPoint)
620                             ((EndPoint)attachment).close();
621                     }
622                 }
623                 selectedKeys.clear();
624             }
625             catch (Throwable x)
626             {
627                 if (isRunning())
628                     LOG.warn(x);
629                 else
630                     LOG.ignore(x);
631             }
632         }
633
634         private void processKey(SelectionKey key)
635         {
636             Object attachment = key.attachment();
637             try
638             {
639                 if (attachment instanceof SelectableEndPoint)
640                 {
641                     ((SelectableEndPoint)attachment).onSelected();
642                 }
643                 else if (key.isConnectable())
644                 {
645                     processConnect(key, (Connect)attachment);
646                 }
647                 else if (key.isAcceptable())
648                 {
649                     processAccept(key);
650                 }
651                 else
652                 {
653                     throw new IllegalStateException();
654                 }
655             }
656             catch (CancelledKeyException x)
657             {
658                 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
659                 if (attachment instanceof EndPoint)
660                     closeNoExceptions((EndPoint)attachment);
661             }
662             catch (Throwable x)
663             {
664                 LOG.warn("Could not process key for channel " + key.channel(), x);
665                 if (attachment instanceof EndPoint)
666                     closeNoExceptions((EndPoint)attachment);
667             }
668         }
669
670         private void processConnect(SelectionKey key, Connect connect)
671         {
672             SocketChannel channel = (SocketChannel)key.channel();
673             try
674             {
675                 key.attach(connect.attachment);
676                 boolean connected = finishConnect(channel);
677                 if (connected)
678                 {
679                     if (connect.timeout.cancel())
680                     {
681                         key.interestOps(0);
682                         EndPoint endpoint = createEndPoint(channel, key);
683                         key.attach(endpoint);
684                     }
685                     else
686                     {
687                         throw new SocketTimeoutException("Concurrent Connect Timeout");
688                     }
689                 }
690                 else
691                 {
692                     throw new ConnectException();
693                 }
694             }
695             catch (Throwable x)
696             {
697                 connect.failed(x);
698             }
699         }
700
701         private void processAccept(SelectionKey key)
702         {
703             ServerSocketChannel server = (ServerSocketChannel)key.channel();
704             SocketChannel channel = null;
705             try
706             {
707                 while ((channel = server.accept()) != null)
708                 {
709                     accepted(channel);
710                 }
711             }
712             catch (Throwable x)
713             {
714                 closeNoExceptions(channel);
715                 LOG.warn("Accept failed for channel " + channel, x);
716             }
717         }
718
719         private void closeNoExceptions(Closeable closeable)
720         {
721             try
722             {
723                 if (closeable != null)
724                     closeable.close();
725             }
726             catch (Throwable x)
727             {
728                 LOG.ignore(x);
729             }
730         }
731
732         public void wakeup()
733         {
734             _selector.wakeup();
735         }
736
737         public boolean isSelectorThread()
738         {
739             return Thread.currentThread() == _thread;
740         }
741
742         private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
743         {
744             EndPoint endPoint = newEndPoint(channel, this, selectionKey);
745             endPointOpened(endPoint);
746             Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
747             endPoint.setConnection(connection);
748             connectionOpened(connection);
749             if (LOG.isDebugEnabled())
750                 LOG.debug("Created {}", endPoint);
751             return endPoint;
752         }
753
754         public void destroyEndPoint(EndPoint endPoint)
755         {
756             if (LOG.isDebugEnabled())
757                 LOG.debug("Destroyed {}", endPoint);
758             Connection connection = endPoint.getConnection();
759             if (connection != null)
760                 connectionClosed(connection);
761             endPointClosed(endPoint);
762         }
763
764         @Override
765         public String dump()
766         {
767             return ContainerLifeCycle.dump(this);
768         }
769
770         @Override
771         public void dump(Appendable out, String indent) throws IOException
772         {
773             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
774
775             Thread selecting = _thread;
776
777             Object where = "not selecting";
778             StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
779             if (trace != null)
780             {
781                 for (StackTraceElement t : trace)
782                     if (t.getClassName().startsWith("org.eclipse.jetty."))
783                     {
784                         where = t;
785                         break;
786                     }
787             }
788
789             Selector selector = _selector;
790             if (selector != null && selector.isOpen())
791             {
792                 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
793                 dump.add(where);
794
795                 DumpKeys dumpKeys = new DumpKeys(dump);
796                 submit(dumpKeys);
797                 dumpKeys.await(5, TimeUnit.SECONDS);
798
799                 ContainerLifeCycle.dump(out, indent, dump);
800             }
801         }
802
803         public void dumpKeysState(List<Object> dumps)
804         {
805             Selector selector = _selector;
806             Set<SelectionKey> keys = selector.keys();
807             dumps.add(selector + " keys=" + keys.size());
808             for (SelectionKey key : keys)
809             {
810                 if (key.isValid())
811                     dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
812                 else
813                     dumps.add(key.attachment() + " iOps=-1 rOps=-1");
814             }
815         }
816
817         @Override
818         public String toString()
819         {
820             Selector selector = _selector;
821             return String.format("%s keys=%d selected=%d",
822                     super.toString(),
823                     selector != null && selector.isOpen() ? selector.keys().size() : -1,
824                     selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
825         }
826
827         private class DumpKeys implements Runnable
828         {
829             private final CountDownLatch latch = new CountDownLatch(1);
830             private final List<Object> _dumps;
831
832             private DumpKeys(List<Object> dumps)
833             {
834                 this._dumps = dumps;
835             }
836
837             @Override
838             public void run()
839             {
840                 dumpKeysState(_dumps);
841                 latch.countDown();
842             }
843
844             public boolean await(long timeout, TimeUnit unit)
845             {
846                 try
847                 {
848                     return latch.await(timeout, unit);
849                 }
850                 catch (InterruptedException x)
851                 {
852                     return false;
853                 }
854             }
855         }
856
857         private class Acceptor implements Runnable
858         {
859             private final ServerSocketChannel _channel;
860
861             public Acceptor(ServerSocketChannel channel)
862             {
863                 this._channel = channel;
864             }
865
866             @Override
867             public void run()
868             {
869                 try
870                 {
871                     SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
872                     if (LOG.isDebugEnabled())
873                         LOG.debug("{} acceptor={}", this, key);
874                 }
875                 catch (Throwable x)
876                 {
877                     closeNoExceptions(_channel);
878                     LOG.warn(x);
879                 }
880             }
881         }
882
883         private class Accept implements Runnable
884         {
885             private final SocketChannel channel;
886             private final Object attachment;
887
888             private Accept(SocketChannel channel, Object attachment)
889             {
890                 this.channel = channel;
891                 this.attachment = attachment;
892             }
893
894             @Override
895             public void run()
896             {
897                 try
898                 {
899                     SelectionKey key = channel.register(_selector, 0, attachment);
900                     EndPoint endpoint = createEndPoint(channel, key);
901                     key.attach(endpoint);
902                 }
903                 catch (Throwable x)
904                 {
905                     closeNoExceptions(channel);
906                     LOG.debug(x);
907                 }
908             }
909         }
910
911         private class Connect implements Runnable
912         {
913             private final AtomicBoolean failed = new AtomicBoolean();
914             private final SocketChannel channel;
915             private final Object attachment;
916             private final Scheduler.Task timeout;
917
918             private Connect(SocketChannel channel, Object attachment)
919             {
920                 this.channel = channel;
921                 this.attachment = attachment;
922                 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
923             }
924
925             @Override
926             public void run()
927             {
928                 try
929                 {
930                     channel.register(_selector, SelectionKey.OP_CONNECT, this);
931                 }
932                 catch (Throwable x)
933                 {
934                     failed(x);
935                 }
936             }
937
938             private void failed(Throwable failure)
939             {
940                 if (failed.compareAndSet(false, true))
941                 {
942                     timeout.cancel();
943                     closeNoExceptions(channel);
944                     connectionFailed(channel, failure, attachment);
945                 }
946             }
947         }
948
949         private class ConnectTimeout implements Runnable
950         {
951             private final Connect connect;
952
953             private ConnectTimeout(Connect connect)
954             {
955                 this.connect = connect;
956             }
957
958             @Override
959             public void run()
960             {
961                 SocketChannel channel = connect.channel;
962                 if (channel.isConnectionPending())
963                 {
964                     if (LOG.isDebugEnabled())
965                         LOG.debug("Channel {} timed out while connecting, closing it", channel);
966                     connect.failed(new SocketTimeoutException("Connect Timeout"));
967                 }
968             }
969         }
970
971         private class Stop implements Runnable
972         {
973             private final CountDownLatch latch = new CountDownLatch(1);
974
975             @Override
976             public void run()
977             {
978                 try
979                 {
980                     for (SelectionKey key : _selector.keys())
981                     {
982                         Object attachment = key.attachment();
983                         if (attachment instanceof EndPoint)
984                         {
985                             EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
986                             execute(closer);
987                             // We are closing the SelectorManager, so we want to block the
988                             // selector thread here until we have closed all EndPoints.
989                             // This is different than calling close() directly, because close()
990                             // can wait forever, while here we are limited by the stop timeout.
991                             closer.await(getStopTimeout());
992                         }
993                     }
994
995                     closeNoExceptions(_selector);
996                 }
997                 finally
998                 {
999                     latch.countDown();
1000                 }
1001             }
1002
1003             public boolean await(long timeout)
1004             {
1005                 try
1006                 {
1007                     return latch.await(timeout, TimeUnit.MILLISECONDS);
1008                 }
1009                 catch (InterruptedException x)
1010                 {
1011                     return false;
1012                 }
1013             }
1014         }
1015
1016         private class EndPointCloser implements Runnable
1017         {
1018             private final CountDownLatch latch = new CountDownLatch(1);
1019             private final EndPoint endPoint;
1020
1021             private EndPointCloser(EndPoint endPoint)
1022             {
1023                 this.endPoint = endPoint;
1024             }
1025
1026             @Override
1027             public void run()
1028             {
1029                 try
1030                 {
1031                     closeNoExceptions(endPoint.getConnection());
1032                 }
1033                 finally
1034                 {
1035                     latch.countDown();
1036                 }
1037             }
1038
1039             private boolean await(long timeout)
1040             {
1041                 try
1042                 {
1043                     return latch.await(timeout, TimeUnit.MILLISECONDS);
1044                 }
1045                 catch (InterruptedException x)
1046                 {
1047                     return false;
1048                 }
1049             }
1050         }
1051     }
1052
1053     /**
1054      * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of
1055      * non-blocking events by the {@link ManagedSelector}.
1056      */
1057     public interface SelectableEndPoint extends EndPoint
1058     {
1059         /**
1060          * <p>Callback method invoked when a read or write events has been detected by the {@link ManagedSelector}
1061          * for this endpoint.</p>
1062          */
1063         void onSelected();
1064     }
1065 }