]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/io/SelectorManager.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / io / SelectorManager.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.io;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.Socket;
25 import java.net.SocketAddress;
26 import java.net.SocketTimeoutException;
27 import java.nio.channels.CancelledKeyException;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.nio.channels.ServerSocketChannel;
31 import java.nio.channels.SocketChannel;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Queue;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41
42 import org.eclipse.jetty.util.ConcurrentArrayQueue;
43 import org.eclipse.jetty.util.TypeUtil;
44 import org.eclipse.jetty.util.component.AbstractLifeCycle;
45 import org.eclipse.jetty.util.component.ContainerLifeCycle;
46 import org.eclipse.jetty.util.component.Dumpable;
47 import org.eclipse.jetty.util.log.Log;
48 import org.eclipse.jetty.util.log.Logger;
49 import org.eclipse.jetty.util.thread.NonBlockingThread;
50 import org.eclipse.jetty.util.thread.Scheduler;
51
52 /**
53  * <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
54  * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p>
55  * <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
56  * {@link EndPoint}s and {@link Connection}s.</p>
57  */
58 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
59 {
60     public static final String SUBMIT_KEY_UPDATES = "org.eclipse.jetty.io.SelectorManager.submitKeyUpdates";
61     public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
62     protected static final Logger LOG = Log.getLogger(SelectorManager.class);
63     private final static boolean __submitKeyUpdates = Boolean.valueOf(System.getProperty(SUBMIT_KEY_UPDATES, "false"));
64     
65     private final Executor executor;
66     private final Scheduler scheduler;
67     private final ManagedSelector[] _selectors;
68     private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
69     private long _selectorIndex;
70     
71     protected SelectorManager(Executor executor, Scheduler scheduler)
72     {
73         this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
74     }
75
76     protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
77     {
78         if (selectors<=0)
79             throw new IllegalArgumentException("No selectors");
80         this.executor = executor;
81         this.scheduler = scheduler;
82         _selectors = new ManagedSelector[selectors];
83     }
84
85     public Executor getExecutor()
86     {
87         return executor;
88     }
89
90     public Scheduler getScheduler()
91     {
92         return scheduler;
93     }
94
95     /**
96      * Get the connect timeout
97      *
98      * @return the connect timeout (in milliseconds)
99      */
100     public long getConnectTimeout()
101     {
102         return _connectTimeout;
103     }
104
105     /**
106      * Set the connect timeout (in milliseconds)
107      *
108      * @param milliseconds the number of milliseconds for the timeout
109      */
110     public void setConnectTimeout(long milliseconds)
111     {
112         _connectTimeout = milliseconds;
113     }
114
115     /**
116      * Executes the given task in a different thread.
117      *
118      * @param task the task to execute
119      */
120     protected void execute(Runnable task)
121     {
122         executor.execute(task);
123     }
124
125     /**
126      * @return the number of selectors in use
127      */
128     public int getSelectorCount()
129     {
130         return _selectors.length;
131     }
132
133     private ManagedSelector chooseSelector()
134     {
135         // The ++ increment here is not atomic, but it does not matter,
136         // so long as the value changes sometimes, then connections will
137         // be distributed over the available selectors.
138         long s = _selectorIndex++;
139         int index = (int)(s % getSelectorCount());
140         return _selectors[index];
141     }
142
143     /**
144      * <p>Registers a channel to perform a non-blocking connect.</p>
145      * <p>The channel must be set in non-blocking mode, and {@link SocketChannel#connect(SocketAddress)}
146      * must be called prior to calling this method.</p>
147      *
148      * @param channel    the channel to register
149      * @param attachment the attachment object
150      */
151     public void connect(SocketChannel channel, Object attachment)
152     {
153         ManagedSelector set = chooseSelector();
154         set.submit(set.new Connect(channel, attachment));
155     }
156
157     /**
158      * <p>Registers a channel to perform non-blocking read/write operations.</p>
159      * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
160      * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}.</p>
161      *
162      * @param channel the channel to register
163      */
164     public void accept(final SocketChannel channel)
165     {
166         final ManagedSelector selector = chooseSelector();
167         selector.submit(selector.new Accept(channel));
168     }
169     
170     /**
171      * <p>Registers a server channel for accept operations.
172      * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel}
173      * then the {@link #accepted(SocketChannel)} method is called, which must be
174      * overridden by a derivation of this class to handle the accepted channel
175      * 
176      * @param server the server channel to register
177      */
178     public void acceptor(final ServerSocketChannel server)
179     {
180         final ManagedSelector selector = chooseSelector();
181         selector.submit(selector.new Acceptor(server));
182     }
183     
184     /**
185      * Callback method when a channel is accepted from the {@link ServerSocketChannel}
186      * passed to {@link #acceptor(ServerSocketChannel)}.
187      * The default impl throws an {@link UnsupportedOperationException}, so it must
188      * be overridden by subclasses if a server channel is provided.
189      *
190      * @param channel the
191      * @throws IOException
192      */
193     protected void accepted(SocketChannel channel) throws IOException
194     {
195         throw new UnsupportedOperationException();
196     }
197
198     @Override
199     protected void doStart() throws Exception
200     {
201         super.doStart();
202         for (int i = 0; i < _selectors.length; i++)
203         {
204             ManagedSelector selector = newSelector(i);
205             _selectors[i] = selector;
206             selector.start();
207             execute(new NonBlockingThread(selector));
208         }
209     }
210
211     /**
212      * <p>Factory method for {@link ManagedSelector}.</p>
213      *
214      * @param id an identifier for the {@link ManagedSelector to create}
215      * @return a new {@link ManagedSelector}
216      */
217     protected ManagedSelector newSelector(int id)
218     {
219         return new ManagedSelector(id);
220     }
221
222     @Override
223     protected void doStop() throws Exception
224     {
225         for (ManagedSelector selector : _selectors)
226             selector.stop();
227         super.doStop();
228     }
229
230     /**
231      * <p>Callback method invoked when an endpoint is opened.</p>
232      *
233      * @param endpoint the endpoint being opened
234      */
235     protected void endPointOpened(EndPoint endpoint)
236     {
237         endpoint.onOpen();
238     }
239
240     /**
241      * <p>Callback method invoked when an endpoint is closed.</p>
242      *
243      * @param endpoint the endpoint being closed
244      */
245     protected void endPointClosed(EndPoint endpoint)
246     {
247         endpoint.onClose();
248     }
249
250     /**
251      * <p>Callback method invoked when a connection is opened.</p>
252      *
253      * @param connection the connection just opened
254      */
255     public void connectionOpened(Connection connection)
256     {
257         try
258         {
259             connection.onOpen();
260         }
261         catch (Throwable x)
262         {
263             if (isRunning())
264                 LOG.warn("Exception while notifying connection " + connection, x);
265             else
266                 LOG.debug("Exception while notifying connection {}",connection, x);
267         }
268     }
269
270     /**
271      * <p>Callback method invoked when a connection is closed.</p>
272      *
273      * @param connection the connection just closed
274      */
275     public void connectionClosed(Connection connection)
276     {
277         try
278         {
279             connection.onClose();
280         }
281         catch (Throwable x)
282         {
283             LOG.debug("Exception while notifying connection " + connection, x);
284         }
285     }
286
287     protected boolean finishConnect(SocketChannel channel) throws IOException
288     {
289         return channel.finishConnect();
290     }
291
292     /**
293      * <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
294      * <p>By default it just logs with level warning.</p>
295      *
296      * @param channel the channel that attempted the connect
297      * @param ex the exception that caused the connect to fail
298      * @param attachment the attachment object associated at registration
299      */
300     protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
301     {
302         LOG.warn(String.format("%s - %s", channel, attachment), ex);
303     }
304
305     /**
306      * <p>Factory method to create {@link EndPoint}.</p>
307      * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
308      * or {@link #accept(SocketChannel)}.</p>
309      *
310      * @param channel   the channel associated to the endpoint
311      * @param selector the selector the channel is registered to
312      * @param selectionKey      the selection key
313      * @return a new endpoint
314      * @throws IOException if the endPoint cannot be created
315      * @see #newConnection(SocketChannel, EndPoint, Object)
316      */
317     protected abstract EndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selector, SelectionKey selectionKey) throws IOException;
318
319     /**
320      * <p>Factory method to create {@link Connection}.</p>
321      *
322      * @param channel    the channel associated to the connection
323      * @param endpoint   the endpoint
324      * @param attachment the attachment
325      * @return a new connection
326      * @throws IOException
327      * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
328      */
329     public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
330
331     @Override
332     public String dump()
333     {
334         return ContainerLifeCycle.dump(this);
335     }
336
337     @Override
338     public void dump(Appendable out, String indent) throws IOException
339     {
340         ContainerLifeCycle.dumpObject(out, this);
341         ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
342     }
343
344     private enum State
345     {
346         CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS
347     }
348
349     /**
350      * <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
351      * <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
352      * happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
353      * with the channel.</p>
354      */
355     public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
356     {
357         private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESS);
358         private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
359         private final int _id;
360         private Selector _selector;
361         private volatile Thread _thread;
362
363         public ManagedSelector(int id)
364         {
365             _id = id;
366             setStopTimeout(5000);
367         }
368
369         @Override
370         protected void doStart() throws Exception
371         {
372             super.doStart();
373             _selector = Selector.open();
374             _state.set(State.PROCESS);
375         }
376
377         @Override
378         protected void doStop() throws Exception
379         {
380             LOG.debug("Stopping {}", this);
381             Stop stop = new Stop();
382             submit(stop);
383             stop.await(getStopTimeout());
384             LOG.debug("Stopped {}", this);
385         }
386
387         /**
388          * Submit a task to update a selector key.  If the System property {@link SelectorManager#SUBMIT_KEY_UPDATES}
389          * is set true (default is false), the task is passed to {@link #submit(Runnable)}.   Otherwise it is run immediately and the selector 
390          * woken up if need be.   
391          * @param update the update to a key
392          */
393         public void updateKey(Runnable update)
394         {
395             if (__submitKeyUpdates)
396             {
397                 submit(update);
398             }
399             else
400             {
401                 runChange(update);
402                 if (_state.compareAndSet(State.SELECT, State.WAKEUP))
403                    wakeup();
404             }
405         }
406         
407         /**
408          * <p>Submits a change to be executed in the selector thread.</p>
409          * <p>Changes may be submitted from any thread, and the selector thread woken up
410          * (if necessary) to execute the change.</p>
411          *
412          * @param change the change to submit
413          */
414         public void submit(Runnable change)
415         {
416             // This method may be called from the selector thread, and therefore
417             // we could directly run the change without queueing, but this may
418             // lead to stack overflows on a busy server, so we always offer the
419             // change to the queue and process the state.
420
421             _changes.offer(change);
422             LOG.debug("Queued change {}", change);
423
424             out: while (true)
425             {
426                 switch (_state.get())
427                 {
428                     case SELECT:
429                         // Avoid multiple wakeup() calls if we the CAS fails
430                         if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
431                             continue;
432                         wakeup();
433                         break out;
434                     case CHANGES:
435                         // Tell the selector thread that we have more changes.
436                         // If we fail to CAS, we possibly need to wakeup(), so loop.
437                         if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
438                             break out;
439                         continue;
440                     case WAKEUP:
441                         // Do nothing, we have already a wakeup scheduled
442                         break out;
443                     case MORE_CHANGES:
444                         // Do nothing, we already notified the selector thread of more changes
445                         break out;
446                     case PROCESS:
447                         // Do nothing, the changes will be run after the processing
448                         break out;
449                     default:
450                         throw new IllegalStateException();
451                 }
452             }
453         }
454
455         private void runChanges()
456         {
457             Runnable change;
458             while ((change = _changes.poll()) != null)
459                 runChange(change);
460         }
461
462         protected void runChange(Runnable change)
463         {
464             try
465             {
466                 LOG.debug("Running change {}", change);
467                 change.run();
468             }
469             catch (Throwable x)
470             {
471                 LOG.debug("Could not run change " + change, x);
472             }
473         }
474
475         @Override
476         public void run()
477         {
478             _thread = Thread.currentThread();
479             String name = _thread.getName();
480             try
481             {
482                 _thread.setName(name + "-selector-" + SelectorManager.this.getClass().getSimpleName()+"@"+Integer.toHexString(SelectorManager.this.hashCode())+"/"+_id);
483                 LOG.debug("Starting {} on {}", _thread, this);
484                 while (isRunning())
485                     select();
486                 runChanges();
487             }
488             finally
489             {
490                 LOG.debug("Stopped {} on {}", _thread, this);
491                 _thread.setName(name);
492             }
493         }
494
495         /**
496          * <p>Process changes and waits on {@link Selector#select()}.</p>
497          *
498          * @see #submit(Runnable)
499          */
500         public void select()
501         {
502             boolean debug = LOG.isDebugEnabled();
503             try
504             {
505                 _state.set(State.CHANGES);
506
507                 // Run the changes, and only exit if we ran all changes
508                 out: while(true)
509                 {
510                     switch (_state.get())
511                     {
512                         case CHANGES:
513                             runChanges();
514                             if (_state.compareAndSet(State.CHANGES, State.SELECT))
515                                 break out;
516                             continue;
517                         case MORE_CHANGES:
518                             runChanges();
519                             _state.set(State.CHANGES);
520                             continue;
521                         default:
522                             throw new IllegalStateException();    
523                     }
524                 }
525                 // Must check first for SELECT and *then* for WAKEUP
526                 // because we read the state twice in the assert, and
527                 // it could change from SELECT to WAKEUP in between.
528                 assert _state.get() == State.SELECT || _state.get() == State.WAKEUP;
529
530                 if (debug)
531                     LOG.debug("Selector loop waiting on select");
532                 int selected = _selector.select();
533                 if (debug)
534                     LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
535
536                 _state.set(State.PROCESS);
537
538                 Set<SelectionKey> selectedKeys = _selector.selectedKeys();
539                 for (SelectionKey key : selectedKeys)
540                 {
541                     if (key.isValid())
542                     {
543                         processKey(key);
544                     }
545                     else
546                     {
547                         if (debug)
548                             LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
549                         Object attachment = key.attachment();
550                         if (attachment instanceof EndPoint)
551                             ((EndPoint)attachment).close();
552                     }
553                 }
554                 selectedKeys.clear();
555             }
556             catch (Throwable x)
557             {
558                 if (isRunning())
559                     LOG.warn(x);
560                 else
561                     LOG.ignore(x);
562             }
563         }
564
565         private void processKey(SelectionKey key)
566         {
567             Object attachment = key.attachment();
568             try
569             {
570                 if (attachment instanceof SelectableEndPoint)
571                 {
572                     ((SelectableEndPoint)attachment).onSelected();
573                 }
574                 else if (key.isConnectable())
575                 {
576                     processConnect(key, (Connect)attachment);
577                 }
578                 else if (key.isAcceptable())
579                 {
580                     processAccept(key);
581                 }
582                 else
583                 {
584                     throw new IllegalStateException();
585                 }
586             }
587             catch (CancelledKeyException x)
588             {
589                 LOG.debug("Ignoring cancelled key for channel {}", key.channel());
590                 if (attachment instanceof EndPoint)
591                     closeNoExceptions((EndPoint)attachment);
592             }
593             catch (Throwable x)
594             {
595                 LOG.warn("Could not process key for channel " + key.channel(), x);
596                 if (attachment instanceof EndPoint)
597                     closeNoExceptions((EndPoint)attachment);
598             }
599         }
600
601         private void processConnect(SelectionKey key, Connect connect)
602         {
603             SocketChannel channel = (SocketChannel)key.channel();
604             try
605             {
606                 key.attach(connect.attachment);
607                 boolean connected = finishConnect(channel);
608                 if (connected)
609                 {
610                     connect.timeout.cancel();
611                     key.interestOps(0);
612                     EndPoint endpoint = createEndPoint(channel, key);
613                     key.attach(endpoint);
614                 }
615                 else
616                 {
617                     throw new ConnectException();
618                 }
619             }
620             catch (Throwable x)
621             {
622                 connect.failed(x);
623             }
624         }
625         
626         private void processAccept(SelectionKey key)
627         {
628             ServerSocketChannel server = (ServerSocketChannel)key.channel();
629             SocketChannel channel = null;
630             try
631             {
632                 while ((channel = server.accept()) != null)
633                 {
634                     accepted(channel);
635                 }
636             }
637             catch (Throwable x)
638             {
639                 closeNoExceptions(channel);
640                 LOG.warn("Accept failed for channel " + channel, x);
641             }
642         }
643
644         private void closeNoExceptions(Closeable closeable)
645         {
646             try
647             {
648                 if (closeable != null)
649                     closeable.close();
650             }
651             catch (Throwable x)
652             {
653                 LOG.ignore(x);
654             }
655         }
656
657         public void wakeup()
658         {
659             _selector.wakeup();
660         }
661
662         public boolean isSelectorThread()
663         {
664             return Thread.currentThread() == _thread;
665         }
666
667         private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
668         {
669             EndPoint endPoint = newEndPoint(channel, this, selectionKey);
670             endPointOpened(endPoint);
671             Connection connection = newConnection(channel, endPoint, selectionKey.attachment());
672             endPoint.setConnection(connection);
673             connectionOpened(connection);
674             LOG.debug("Created {}", endPoint);
675             return endPoint;
676         }
677
678         public void destroyEndPoint(EndPoint endPoint)
679         {
680             LOG.debug("Destroyed {}", endPoint);
681             Connection connection = endPoint.getConnection();
682             if (connection != null)
683                 connectionClosed(connection);
684             endPointClosed(endPoint);
685         }
686
687         @Override
688         public String dump()
689         {
690             return ContainerLifeCycle.dump(this);
691         }
692
693         @Override
694         public void dump(Appendable out, String indent) throws IOException
695         {
696             out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append("\n");
697
698             Thread selecting = _thread;
699
700             Object where = "not selecting";
701             StackTraceElement[] trace = selecting == null ? null : selecting.getStackTrace();
702             if (trace != null)
703             {
704                 for (StackTraceElement t : trace)
705                     if (t.getClassName().startsWith("org.eclipse.jetty."))
706                     {
707                         where = t;
708                         break;
709                     }
710             }
711
712             Selector selector = _selector;
713             if (selector != null && selector.isOpen())
714             {
715                 final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
716                 dump.add(where);
717
718                 DumpKeys dumpKeys = new DumpKeys(dump);
719                 submit(dumpKeys);
720                 dumpKeys.await(5, TimeUnit.SECONDS);
721
722                 ContainerLifeCycle.dump(out, indent, dump);
723             }
724         }
725
726         public void dumpKeysState(List<Object> dumps)
727         {
728             Selector selector = _selector;
729             Set<SelectionKey> keys = selector.keys();
730             dumps.add(selector + " keys=" + keys.size());
731             for (SelectionKey key : keys)
732             {
733                 if (key.isValid())
734                     dumps.add(key.attachment() + " iOps=" + key.interestOps() + " rOps=" + key.readyOps());
735                 else
736                     dumps.add(key.attachment() + " iOps=-1 rOps=-1");
737             }
738         }
739
740         @Override
741         public String toString()
742         {
743             Selector selector = _selector;
744             return String.format("%s keys=%d selected=%d",
745                     super.toString(),
746                     selector != null && selector.isOpen() ? selector.keys().size() : -1,
747                     selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
748         }
749
750         private class DumpKeys implements Runnable
751         {
752             private final CountDownLatch latch = new CountDownLatch(1);
753             private final List<Object> _dumps;
754
755             private DumpKeys(List<Object> dumps)
756             {
757                 this._dumps = dumps;
758             }
759
760             @Override
761             public void run()
762             {
763                 dumpKeysState(_dumps);
764                 latch.countDown();
765             }
766
767             public boolean await(long timeout, TimeUnit unit)
768             {
769                 try
770                 {
771                     return latch.await(timeout, unit);
772                 }
773                 catch (InterruptedException x)
774                 {
775                     return false;
776                 }
777             }
778         }
779
780         private class Acceptor implements Runnable
781         {
782             private final ServerSocketChannel _channel;
783
784             public Acceptor(ServerSocketChannel channel)
785             {
786                 this._channel = channel;
787             }
788
789             @Override
790             public void run()
791             {
792                 try
793                 {
794                     SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
795                     LOG.debug("{} acceptor={}", this, key);
796                 }
797                 catch (Throwable x)
798                 {
799                     closeNoExceptions(_channel);
800                     LOG.warn(x);
801                 }
802             }
803         }
804
805         private class Accept implements Runnable
806         {
807             private final SocketChannel _channel;
808
809             public Accept(SocketChannel channel)
810             {
811                 this._channel = channel;
812             }
813
814             @Override
815             public void run()
816             {
817                 try
818                 {
819                     SelectionKey key = _channel.register(_selector, 0, null);
820                     EndPoint endpoint = createEndPoint(_channel, key);
821                     key.attach(endpoint);
822                 }
823                 catch (Throwable x)
824                 {
825                     closeNoExceptions(_channel);
826                     LOG.debug(x);
827                 }
828             }
829         }
830
831         private class Connect implements Runnable
832         {
833             private final AtomicBoolean failed = new AtomicBoolean();
834             private final SocketChannel channel;
835             private final Object attachment;
836             private final Scheduler.Task timeout;
837
838             public Connect(SocketChannel channel, Object attachment)
839             {
840                 this.channel = channel;
841                 this.attachment = attachment;
842                 this.timeout = scheduler.schedule(new ConnectTimeout(this), getConnectTimeout(), TimeUnit.MILLISECONDS);
843             }
844
845             @Override
846             public void run()
847             {
848                 try
849                 {
850                     channel.register(_selector, SelectionKey.OP_CONNECT, this);
851                 }
852                 catch (Throwable x)
853                 {
854                     failed(x);
855                 }
856             }
857
858             protected void failed(Throwable failure)
859             {
860                 if (failed.compareAndSet(false, true))
861                 {
862                     timeout.cancel();
863                     closeNoExceptions(channel);
864                     connectionFailed(channel, failure, attachment);
865                 }
866             }
867         }
868
869         private class ConnectTimeout implements Runnable
870         {
871             private final Connect connect;
872
873             private ConnectTimeout(Connect connect)
874             {
875                 this.connect = connect;
876             }
877
878             @Override
879             public void run()
880             {
881                 SocketChannel channel = connect.channel;
882                 if (channel.isConnectionPending())
883                 {
884                     LOG.debug("Channel {} timed out while connecting, closing it", channel);
885                     connect.failed(new SocketTimeoutException());
886                 }
887             }
888         }
889
890         private class Stop implements Runnable
891         {
892             private final CountDownLatch latch = new CountDownLatch(1);
893
894             @Override
895             public void run()
896             {
897                 try
898                 {
899                     for (SelectionKey key : _selector.keys())
900                     {
901                         Object attachment = key.attachment();
902                         if (attachment instanceof EndPoint)
903                         {
904                             EndPointCloser closer = new EndPointCloser((EndPoint)attachment);
905                             execute(closer);
906                             // We are closing the SelectorManager, so we want to block the
907                             // selector thread here until we have closed all EndPoints.
908                             // This is different than calling close() directly, because close()
909                             // can wait forever, while here we are limited by the stop timeout.
910                             closer.await(getStopTimeout());
911                         }
912                     }
913
914                     closeNoExceptions(_selector);
915                 }
916                 finally
917                 {
918                     latch.countDown();
919                 }
920             }
921
922             public boolean await(long timeout)
923             {
924                 try
925                 {
926                     return latch.await(timeout, TimeUnit.MILLISECONDS);
927                 }
928                 catch (InterruptedException x)
929                 {
930                     return false;
931                 }
932             }
933         }
934
935         private class EndPointCloser implements Runnable
936         {
937             private final CountDownLatch latch = new CountDownLatch(1);
938             private final EndPoint endPoint;
939
940             private EndPointCloser(EndPoint endPoint)
941             {
942                 this.endPoint = endPoint;
943             }
944
945             @Override
946             public void run()
947             {
948                 try
949                 {
950                     closeNoExceptions(endPoint.getConnection());
951                 }
952                 finally
953                 {
954                     latch.countDown();
955                 }
956             }
957
958             private boolean await(long timeout)
959             {
960                 try
961                 {
962                     return latch.await(timeout, TimeUnit.MILLISECONDS);
963                 }
964                 catch (InterruptedException x)
965                 {
966                     return false;
967                 }
968             }
969         }
970     }
971
972     /**
973      * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be notified of
974      * non-blocking events by the {@link ManagedSelector}.
975      */
976     public interface SelectableEndPoint extends EndPoint
977     {
978         /**
979          * <p>Callback method invoked when a read or write events has been detected by the {@link ManagedSelector}
980          * for this endpoint.</p>
981          */
982         void onSelected();
983     }
984 }