]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/util/BlockingArrayQueue.java
updating jetty to jetty-9.2.16.v2016040
[gigi.git] / lib / jetty / org / eclipse / jetty / util / BlockingArrayQueue.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.util;
20
21 import java.util.AbstractList;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.ListIterator;
25 import java.util.NoSuchElementException;
26 import java.util.Objects;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.locks.Condition;
31 import java.util.concurrent.locks.Lock;
32 import java.util.concurrent.locks.ReentrantLock;
33
34 /**
35  * A BlockingQueue backed by a circular array capable or growing.
36  * <p/>
37  * This queue is uses a variant of the two lock queue algorithm to provide an efficient queue or list backed by a growable circular array.
38  * <p/>
39  * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is able to grow and provides a blocking put call.
40  * <p/>
41  * The queue has both a capacity (the size of the array currently allocated) and a max capacity (the maximum size that may be allocated), which defaults to
42  * {@link Integer#MAX_VALUE}.
43  * 
44  * @param <E>
45  *            The element type
46  */
47 public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
48 {
49     /**
50      * The head offset in the {@link #_indexes} array, displaced by 15 slots to avoid false sharing with the array length (stored before the first element of
51      * the array itself).
52      */
53     private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
54     /**
55      * The tail offset in the {@link #_indexes} array, displaced by 16 slots from the head to avoid false sharing with it.
56      */
57     private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine();
58     /**
59      * Default initial capacity, 128.
60      */
61     public static final int DEFAULT_CAPACITY = 128;
62     /**
63      * Default growth factor, 64.
64      */
65     public static final int DEFAULT_GROWTH = 64;
66
67     private final int _maxCapacity;
68     private final int _growCapacity;
69     /**
70      * Array that holds the head and tail indexes, separated by a cache line to avoid false sharing
71      */
72     private final int[] _indexes = new int[TAIL_OFFSET + 1];
73     private final Lock _tailLock = new ReentrantLock();
74     private final AtomicInteger _size = new AtomicInteger();
75     private final Lock _headLock = new ReentrantLock();
76     private final Condition _notEmpty = _headLock.newCondition();
77     private Object[] _elements;
78
79     /**
80      * Creates an unbounded {@link BlockingArrayQueue} with default initial capacity and grow factor.
81      * 
82      * @see #DEFAULT_CAPACITY
83      * @see #DEFAULT_GROWTH
84      */
85     public BlockingArrayQueue()
86     {
87         _elements = new Object[DEFAULT_CAPACITY];
88         _growCapacity = DEFAULT_GROWTH;
89         _maxCapacity = Integer.MAX_VALUE;
90     }
91
92     /**
93      * Creates a bounded {@link BlockingArrayQueue} that does not grow. The capacity of the queue is fixed and equal to the given parameter.
94      * 
95      * @param maxCapacity
96      *            the maximum capacity
97      */
98     public BlockingArrayQueue(int maxCapacity)
99     {
100         _elements = new Object[maxCapacity];
101         _growCapacity = -1;
102         _maxCapacity = maxCapacity;
103     }
104
105     /**
106      * Creates an unbounded {@link BlockingArrayQueue} that grows by the given parameter.
107      * 
108      * @param capacity
109      *            the initial capacity
110      * @param growBy
111      *            the growth factor
112      */
113     public BlockingArrayQueue(int capacity, int growBy)
114     {
115         _elements = new Object[capacity];
116         _growCapacity = growBy;
117         _maxCapacity = Integer.MAX_VALUE;
118     }
119
120     /**
121      * Create a bounded {@link BlockingArrayQueue} that grows by the given parameter.
122      * 
123      * @param capacity
124      *            the initial capacity
125      * @param growBy
126      *            the growth factor
127      * @param maxCapacity
128      *            the maximum capacity
129      */
130     public BlockingArrayQueue(int capacity, int growBy, int maxCapacity)
131     {
132         if (capacity > maxCapacity)
133             throw new IllegalArgumentException();
134         _elements = new Object[capacity];
135         _growCapacity = growBy;
136         _maxCapacity = maxCapacity;
137     }
138
139     /*----------------------------------------------------------------------------*/
140     /* Collection methods */
141     /*----------------------------------------------------------------------------*/
142
143     @Override
144     public void clear()
145     {
146
147         _tailLock.lock();
148         try
149         {
150
151             _headLock.lock();
152             try
153             {
154                 _indexes[HEAD_OFFSET] = 0;
155                 _indexes[TAIL_OFFSET] = 0;
156                 _size.set(0);
157             }
158             finally
159             {
160                 _headLock.unlock();
161             }
162         }
163         finally
164         {
165             _tailLock.unlock();
166         }
167     }
168
169     @Override
170     public int size()
171     {
172         return _size.get();
173     }
174
175     @Override
176     public Iterator<E> iterator()
177     {
178         return listIterator();
179     }
180
181     /*----------------------------------------------------------------------------*/
182     /* Queue methods */
183     /*----------------------------------------------------------------------------*/
184
185     @SuppressWarnings("unchecked")
186     @Override
187     public E poll()
188     {
189         if (_size.get() == 0)
190             return null;
191
192         E e = null;
193
194         _headLock.lock(); // Size cannot shrink
195         try
196         {
197             if (_size.get() > 0)
198             {
199                 final int head = _indexes[HEAD_OFFSET];
200                 e = (E)_elements[head];
201                 _elements[head] = null;
202                 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
203                 if (_size.decrementAndGet() > 0)
204                     _notEmpty.signal();
205             }
206         }
207         finally
208         {
209             _headLock.unlock();
210         }
211         return e;
212     }
213
214     @SuppressWarnings("unchecked")
215     @Override
216     public E peek()
217     {
218         if (_size.get() == 0)
219             return null;
220
221         E e = null;
222
223         _headLock.lock(); // Size cannot shrink
224         try
225         {
226             if (_size.get() > 0)
227                 e = (E)_elements[_indexes[HEAD_OFFSET]];
228         }
229         finally
230         {
231             _headLock.unlock();
232         }
233         return e;
234     }
235
236     @Override
237     public E remove()
238     {
239         E e = poll();
240         if (e == null)
241             throw new NoSuchElementException();
242         return e;
243     }
244
245     @Override
246     public E element()
247     {
248         E e = peek();
249         if (e == null)
250             throw new NoSuchElementException();
251         return e;
252     }
253
254     /*----------------------------------------------------------------------------*/
255     /* BlockingQueue methods */
256     /*----------------------------------------------------------------------------*/
257
258     @Override
259     public boolean offer(E e)
260     {
261         Objects.requireNonNull(e);
262
263         boolean notEmpty = false;
264         _tailLock.lock(); // Size cannot grow... only shrink
265         try
266         {
267             int size = _size.get();
268             if (size >= _maxCapacity)
269                 return false;
270
271             // Should we expand array?
272             if (size == _elements.length)
273             {
274                 _headLock.lock();
275                 try
276                 {
277                     if (!grow())
278                         return false;
279                 }
280                 finally
281                 {
282                     _headLock.unlock();
283                 }
284             }
285
286             // Re-read head and tail after a possible grow
287             int tail = _indexes[TAIL_OFFSET];
288             _elements[tail] = e;
289             _indexes[TAIL_OFFSET] = (tail + 1) % _elements.length;
290             notEmpty = _size.getAndIncrement() == 0;
291         }
292         finally
293         {
294             _tailLock.unlock();
295         }
296
297         if (notEmpty)
298         {
299             _headLock.lock();
300             try
301             {
302                 _notEmpty.signal();
303             }
304             finally
305             {
306                 _headLock.unlock();
307             }
308         }
309
310         return true;
311     }
312
313     @Override
314     public boolean add(E e)
315     {
316         if (offer(e))
317             return true;
318         else
319             throw new IllegalStateException();
320     }
321
322     @Override
323     public void put(E o) throws InterruptedException
324     {
325         // The mechanism to await and signal when the queue is full is not implemented
326         throw new UnsupportedOperationException();
327     }
328
329     @Override
330     public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
331     {
332         // The mechanism to await and signal when the queue is full is not implemented
333         throw new UnsupportedOperationException();
334     }
335
336     @SuppressWarnings("unchecked")
337     @Override
338     public E take() throws InterruptedException
339     {
340         E e = null;
341
342         _headLock.lockInterruptibly(); // Size cannot shrink
343         try
344         {
345             try
346             {
347                 while (_size.get() == 0)
348                 {
349                     _notEmpty.await();
350                 }
351             }
352             catch (InterruptedException ie)
353             {
354                 _notEmpty.signal();
355                 throw ie;
356             }
357
358             final int head = _indexes[HEAD_OFFSET];
359             e = (E)_elements[head];
360             _elements[head] = null;
361             _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
362
363             if (_size.decrementAndGet() > 0)
364                 _notEmpty.signal();
365         }
366         finally
367         {
368             _headLock.unlock();
369         }
370         return e;
371     }
372
373     @SuppressWarnings("unchecked")
374     @Override
375     public E poll(long time, TimeUnit unit) throws InterruptedException
376     {
377         long nanos = unit.toNanos(time);
378         E e = null;
379
380         _headLock.lockInterruptibly(); // Size cannot shrink
381         try
382         {
383             try
384             {
385                 while (_size.get() == 0)
386                 {
387                     if (nanos <= 0)
388                         return null;
389                     nanos = _notEmpty.awaitNanos(nanos);
390                 }
391             }
392             catch (InterruptedException x)
393             {
394                 _notEmpty.signal();
395                 throw x;
396             }
397
398             int head = _indexes[HEAD_OFFSET];
399             e = (E)_elements[head];
400             _elements[head] = null;
401             _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
402
403             if (_size.decrementAndGet() > 0)
404                 _notEmpty.signal();
405         }
406         finally
407         {
408             _headLock.unlock();
409         }
410         return e;
411     }
412
413     @Override
414     public boolean remove(Object o)
415     {
416
417         _tailLock.lock();
418         try
419         {
420
421             _headLock.lock();
422             try
423             {
424                 if (isEmpty())
425                     return false;
426
427                 final int head = _indexes[HEAD_OFFSET];
428                 final int tail = _indexes[TAIL_OFFSET];
429                 final int capacity = _elements.length;
430
431                 int i = head;
432                 while (true)
433                 {
434                     if (Objects.equals(_elements[i],o))
435                     {
436                         remove(i >= head?i - head:capacity - head + i);
437                         return true;
438                     }
439                     ++i;
440                     if (i == capacity)
441                         i = 0;
442                     if (i == tail)
443                         return false;
444                 }
445             }
446             finally
447             {
448                 _headLock.unlock();
449             }
450         }
451         finally
452         {
453             _tailLock.unlock();
454         }
455     }
456
457     @Override
458     public int remainingCapacity()
459     {
460
461         _tailLock.lock();
462         try
463         {
464
465             _headLock.lock();
466             try
467             {
468                 return getCapacity() - size();
469             }
470             finally
471             {
472                 _headLock.unlock();
473             }
474         }
475         finally
476         {
477             _tailLock.unlock();
478         }
479     }
480
481     @Override
482     public int drainTo(Collection<? super E> c)
483     {
484         throw new UnsupportedOperationException();
485     }
486
487     @Override
488     public int drainTo(Collection<? super E> c, int maxElements)
489     {
490         throw new UnsupportedOperationException();
491     }
492
493     /*----------------------------------------------------------------------------*/
494     /* List methods */
495     /*----------------------------------------------------------------------------*/
496
497     @SuppressWarnings("unchecked")
498     @Override
499     public E get(int index)
500     {
501
502         _tailLock.lock();
503         try
504         {
505
506             _headLock.lock();
507             try
508             {
509                 if (index < 0 || index >= _size.get())
510                     throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
511                 int i = _indexes[HEAD_OFFSET] + index;
512                 int capacity = _elements.length;
513                 if (i >= capacity)
514                     i -= capacity;
515                 return (E)_elements[i];
516             }
517             finally
518             {
519                 _headLock.unlock();
520             }
521         }
522         finally
523         {
524             _tailLock.unlock();
525         }
526     }
527
528     @Override
529     public void add(int index, E e)
530     {
531         if (e == null)
532             throw new NullPointerException();
533
534         _tailLock.lock();
535         try
536         {
537
538             _headLock.lock();
539             try
540             {
541                 final int size = _size.get();
542
543                 if (index < 0 || index > size)
544                     throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
545
546                 if (index == size)
547                 {
548                     add(e);
549                 }
550                 else
551                 {
552                     if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET])
553                         if (!grow())
554                             throw new IllegalStateException("full");
555
556                     // Re-read head and tail after a possible grow
557                     int i = _indexes[HEAD_OFFSET] + index;
558                     int capacity = _elements.length;
559
560                     if (i >= capacity)
561                         i -= capacity;
562
563                     _size.incrementAndGet();
564                     int tail = _indexes[TAIL_OFFSET];
565                     _indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity;
566
567                     if (i < tail)
568                     {
569                         System.arraycopy(_elements,i,_elements,i + 1,tail - i);
570                         _elements[i] = e;
571                     }
572                     else
573                     {
574                         if (tail > 0)
575                         {
576                             System.arraycopy(_elements,0,_elements,1,tail);
577                             _elements[0] = _elements[capacity - 1];
578                         }
579
580                         System.arraycopy(_elements,i,_elements,i + 1,capacity - i - 1);
581                         _elements[i] = e;
582                     }
583                 }
584             }
585             finally
586             {
587                 _headLock.unlock();
588             }
589         }
590         finally
591         {
592             _tailLock.unlock();
593         }
594     }
595
596     @SuppressWarnings("unchecked")
597     @Override
598     public E set(int index, E e)
599     {
600         Objects.requireNonNull(e);
601
602         _tailLock.lock();
603         try
604         {
605
606             _headLock.lock();
607             try
608             {
609                 if (index < 0 || index >= _size.get())
610                     throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
611
612                 int i = _indexes[HEAD_OFFSET] + index;
613                 int capacity = _elements.length;
614                 if (i >= capacity)
615                     i -= capacity;
616                 E old = (E)_elements[i];
617                 _elements[i] = e;
618                 return old;
619             }
620             finally
621             {
622                 _headLock.unlock();
623             }
624         }
625         finally
626         {
627             _tailLock.unlock();
628         }
629     }
630
631     @SuppressWarnings("unchecked")
632     @Override
633     public E remove(int index)
634     {
635
636         _tailLock.lock();
637         try
638         {
639
640             _headLock.lock();
641             try
642             {
643                 if (index < 0 || index >= _size.get())
644                     throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
645
646                 int i = _indexes[HEAD_OFFSET] + index;
647                 int capacity = _elements.length;
648                 if (i >= capacity)
649                     i -= capacity;
650                 E old = (E)_elements[i];
651
652                 int tail = _indexes[TAIL_OFFSET];
653                 if (i < tail)
654                 {
655                     System.arraycopy(_elements,i + 1,_elements,i,tail - i);
656                     --_indexes[TAIL_OFFSET];
657                 }
658                 else
659                 {
660                     System.arraycopy(_elements,i + 1,_elements,i,capacity - i - 1);
661                     _elements[capacity - 1] = _elements[0];
662                     if (tail > 0)
663                     {
664                         System.arraycopy(_elements,1,_elements,0,tail);
665                         --_indexes[TAIL_OFFSET];
666                     }
667                     else
668                     {
669                         _indexes[TAIL_OFFSET] = capacity - 1;
670                     }
671                     _elements[_indexes[TAIL_OFFSET]] = null;
672                 }
673
674                 _size.decrementAndGet();
675
676                 return old;
677             }
678             finally
679             {
680                 _headLock.unlock();
681             }
682         }
683         finally
684         {
685             _tailLock.unlock();
686         }
687     }
688
689     @Override
690     public ListIterator<E> listIterator(int index)
691     {
692
693         _tailLock.lock();
694         try
695         {
696
697             _headLock.lock();
698             try
699             {
700                 Object[] elements = new Object[size()];
701                 if (size() > 0)
702                 {
703                     int head = _indexes[HEAD_OFFSET];
704                     int tail = _indexes[TAIL_OFFSET];
705                     if (head < tail)
706                     {
707                         System.arraycopy(_elements,head,elements,0,tail - head);
708                     }
709                     else
710                     {
711                         int chunk = _elements.length - head;
712                         System.arraycopy(_elements,head,elements,0,chunk);
713                         System.arraycopy(_elements,0,elements,chunk,tail);
714                     }
715                 }
716                 return new Itr(elements,index);
717             }
718             finally
719             {
720                 _headLock.unlock();
721             }
722         }
723         finally
724         {
725             _tailLock.unlock();
726         }
727     }
728
729     /*----------------------------------------------------------------------------*/
730     /* Additional methods */
731     /*----------------------------------------------------------------------------*/
732
733     /**
734      * @return the current capacity of this queue
735      */
736     public int getCapacity()
737     {
738         _tailLock.lock();
739         try
740         {
741             return _elements.length;
742         }
743         finally
744         {
745             _tailLock.unlock();
746         }
747     }
748
749     /**
750      * @return the max capacity of this queue, or -1 if this queue is unbounded
751      */
752     public int getMaxCapacity()
753     {
754         return _maxCapacity;
755     }
756
757     /*----------------------------------------------------------------------------*/
758     /* Implementation methods */
759     /*----------------------------------------------------------------------------*/
760
761     private boolean grow()
762     {
763         if (_growCapacity <= 0)
764             return false;
765
766         _tailLock.lock();
767         try
768         {
769
770             _headLock.lock();
771             try
772             {
773                 final int head = _indexes[HEAD_OFFSET];
774                 final int tail = _indexes[TAIL_OFFSET];
775                 final int newTail;
776                 final int capacity = _elements.length;
777
778                 Object[] elements = new Object[capacity + _growCapacity];
779
780                 if (head < tail)
781                 {
782                     newTail = tail - head;
783                     System.arraycopy(_elements,head,elements,0,newTail);
784                 }
785                 else if (head > tail || _size.get() > 0)
786                 {
787                     newTail = capacity + tail - head;
788                     int cut = capacity - head;
789                     System.arraycopy(_elements,head,elements,0,cut);
790                     System.arraycopy(_elements,0,elements,cut,tail);
791                 }
792                 else
793                 {
794                     newTail = 0;
795                 }
796
797                 _elements = elements;
798                 _indexes[HEAD_OFFSET] = 0;
799                 _indexes[TAIL_OFFSET] = newTail;
800                 return true;
801             }
802             finally
803             {
804                 _headLock.unlock();
805             }
806         }
807         finally
808         {
809             _tailLock.unlock();
810         }
811     }
812
813     private class Itr implements ListIterator<E>
814     {
815         private final Object[] _elements;
816         private int _cursor;
817
818         public Itr(Object[] elements, int offset)
819         {
820             _elements = elements;
821             _cursor = offset;
822         }
823
824         @Override
825         public boolean hasNext()
826         {
827             return _cursor < _elements.length;
828         }
829
830         @SuppressWarnings("unchecked")
831         @Override
832         public E next()
833         {
834             return (E)_elements[_cursor++];
835         }
836
837         @Override
838         public boolean hasPrevious()
839         {
840             return _cursor > 0;
841         }
842
843         @SuppressWarnings("unchecked")
844         @Override
845         public E previous()
846         {
847             return (E)_elements[--_cursor];
848         }
849
850         @Override
851         public int nextIndex()
852         {
853             return _cursor + 1;
854         }
855
856         @Override
857         public int previousIndex()
858         {
859             return _cursor - 1;
860         }
861
862         @Override
863         public void remove()
864         {
865             throw new UnsupportedOperationException();
866         }
867
868         @Override
869         public void set(E e)
870         {
871             throw new UnsupportedOperationException();
872         }
873
874         @Override
875         public void add(E e)
876         {
877             throw new UnsupportedOperationException();
878         }
879     }
880 }