2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.util;
21 import java.util.AbstractList;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.ListIterator;
25 import java.util.NoSuchElementException;
26 import java.util.Objects;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.locks.Condition;
31 import java.util.concurrent.locks.Lock;
32 import java.util.concurrent.locks.ReentrantLock;
35 * A BlockingQueue backed by a circular array capable or growing.
37 * This queue is uses a variant of the two lock queue algorithm to provide an efficient queue or list backed by a growable circular array.
39 * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is able to grow and provides a blocking put call.
41 * The queue has both a capacity (the size of the array currently allocated) and a max capacity (the maximum size that may be allocated), which defaults to
42 * {@link Integer#MAX_VALUE}.
47 public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
50 * The head offset in the {@link #_indexes} array, displaced by 15 slots to avoid false sharing with the array length (stored before the first element of
53 private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
55 * The tail offset in the {@link #_indexes} array, displaced by 16 slots from the head to avoid false sharing with it.
57 private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine();
59 * Default initial capacity, 128.
61 public static final int DEFAULT_CAPACITY = 128;
63 * Default growth factor, 64.
65 public static final int DEFAULT_GROWTH = 64;
67 private final int _maxCapacity;
68 private final int _growCapacity;
70 * Array that holds the head and tail indexes, separated by a cache line to avoid false sharing
72 private final int[] _indexes = new int[TAIL_OFFSET + 1];
73 private final Lock _tailLock = new ReentrantLock();
74 private final AtomicInteger _size = new AtomicInteger();
75 private final Lock _headLock = new ReentrantLock();
76 private final Condition _notEmpty = _headLock.newCondition();
77 private Object[] _elements;
80 * Creates an unbounded {@link BlockingArrayQueue} with default initial capacity and grow factor.
82 * @see #DEFAULT_CAPACITY
83 * @see #DEFAULT_GROWTH
85 public BlockingArrayQueue()
87 _elements = new Object[DEFAULT_CAPACITY];
88 _growCapacity = DEFAULT_GROWTH;
89 _maxCapacity = Integer.MAX_VALUE;
93 * Creates a bounded {@link BlockingArrayQueue} that does not grow. The capacity of the queue is fixed and equal to the given parameter.
96 * the maximum capacity
98 public BlockingArrayQueue(int maxCapacity)
100 _elements = new Object[maxCapacity];
102 _maxCapacity = maxCapacity;
106 * Creates an unbounded {@link BlockingArrayQueue} that grows by the given parameter.
109 * the initial capacity
113 public BlockingArrayQueue(int capacity, int growBy)
115 _elements = new Object[capacity];
116 _growCapacity = growBy;
117 _maxCapacity = Integer.MAX_VALUE;
121 * Create a bounded {@link BlockingArrayQueue} that grows by the given parameter.
124 * the initial capacity
128 * the maximum capacity
130 public BlockingArrayQueue(int capacity, int growBy, int maxCapacity)
132 if (capacity > maxCapacity)
133 throw new IllegalArgumentException();
134 _elements = new Object[capacity];
135 _growCapacity = growBy;
136 _maxCapacity = maxCapacity;
139 /*----------------------------------------------------------------------------*/
140 /* Collection methods */
141 /*----------------------------------------------------------------------------*/
154 _indexes[HEAD_OFFSET] = 0;
155 _indexes[TAIL_OFFSET] = 0;
176 public Iterator<E> iterator()
178 return listIterator();
181 /*----------------------------------------------------------------------------*/
183 /*----------------------------------------------------------------------------*/
185 @SuppressWarnings("unchecked")
189 if (_size.get() == 0)
194 _headLock.lock(); // Size cannot shrink
199 final int head = _indexes[HEAD_OFFSET];
200 e = (E)_elements[head];
201 _elements[head] = null;
202 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
203 if (_size.decrementAndGet() > 0)
214 @SuppressWarnings("unchecked")
218 if (_size.get() == 0)
223 _headLock.lock(); // Size cannot shrink
227 e = (E)_elements[_indexes[HEAD_OFFSET]];
241 throw new NoSuchElementException();
250 throw new NoSuchElementException();
254 /*----------------------------------------------------------------------------*/
255 /* BlockingQueue methods */
256 /*----------------------------------------------------------------------------*/
259 public boolean offer(E e)
261 Objects.requireNonNull(e);
263 boolean notEmpty = false;
264 _tailLock.lock(); // Size cannot grow... only shrink
267 int size = _size.get();
268 if (size >= _maxCapacity)
271 // Should we expand array?
272 if (size == _elements.length)
286 // Re-read head and tail after a possible grow
287 int tail = _indexes[TAIL_OFFSET];
289 _indexes[TAIL_OFFSET] = (tail + 1) % _elements.length;
290 notEmpty = _size.getAndIncrement() == 0;
314 public boolean add(E e)
319 throw new IllegalStateException();
323 public void put(E o) throws InterruptedException
325 // The mechanism to await and signal when the queue is full is not implemented
326 throw new UnsupportedOperationException();
330 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
332 // The mechanism to await and signal when the queue is full is not implemented
333 throw new UnsupportedOperationException();
336 @SuppressWarnings("unchecked")
338 public E take() throws InterruptedException
342 _headLock.lockInterruptibly(); // Size cannot shrink
347 while (_size.get() == 0)
352 catch (InterruptedException ie)
358 final int head = _indexes[HEAD_OFFSET];
359 e = (E)_elements[head];
360 _elements[head] = null;
361 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
363 if (_size.decrementAndGet() > 0)
373 @SuppressWarnings("unchecked")
375 public E poll(long time, TimeUnit unit) throws InterruptedException
377 long nanos = unit.toNanos(time);
380 _headLock.lockInterruptibly(); // Size cannot shrink
385 while (_size.get() == 0)
389 nanos = _notEmpty.awaitNanos(nanos);
392 catch (InterruptedException x)
398 int head = _indexes[HEAD_OFFSET];
399 e = (E)_elements[head];
400 _elements[head] = null;
401 _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
403 if (_size.decrementAndGet() > 0)
414 public boolean remove(Object o)
427 final int head = _indexes[HEAD_OFFSET];
428 final int tail = _indexes[TAIL_OFFSET];
429 final int capacity = _elements.length;
434 if (Objects.equals(_elements[i],o))
436 remove(i >= head?i - head:capacity - head + i);
458 public int remainingCapacity()
468 return getCapacity() - size();
482 public int drainTo(Collection<? super E> c)
484 throw new UnsupportedOperationException();
488 public int drainTo(Collection<? super E> c, int maxElements)
490 throw new UnsupportedOperationException();
493 /*----------------------------------------------------------------------------*/
495 /*----------------------------------------------------------------------------*/
497 @SuppressWarnings("unchecked")
499 public E get(int index)
509 if (index < 0 || index >= _size.get())
510 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
511 int i = _indexes[HEAD_OFFSET] + index;
512 int capacity = _elements.length;
515 return (E)_elements[i];
529 public void add(int index, E e)
532 throw new NullPointerException();
541 final int size = _size.get();
543 if (index < 0 || index > size)
544 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
552 if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET])
554 throw new IllegalStateException("full");
556 // Re-read head and tail after a possible grow
557 int i = _indexes[HEAD_OFFSET] + index;
558 int capacity = _elements.length;
563 _size.incrementAndGet();
564 int tail = _indexes[TAIL_OFFSET];
565 _indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity;
569 System.arraycopy(_elements,i,_elements,i + 1,tail - i);
576 System.arraycopy(_elements,0,_elements,1,tail);
577 _elements[0] = _elements[capacity - 1];
580 System.arraycopy(_elements,i,_elements,i + 1,capacity - i - 1);
596 @SuppressWarnings("unchecked")
598 public E set(int index, E e)
600 Objects.requireNonNull(e);
609 if (index < 0 || index >= _size.get())
610 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
612 int i = _indexes[HEAD_OFFSET] + index;
613 int capacity = _elements.length;
616 E old = (E)_elements[i];
631 @SuppressWarnings("unchecked")
633 public E remove(int index)
643 if (index < 0 || index >= _size.get())
644 throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
646 int i = _indexes[HEAD_OFFSET] + index;
647 int capacity = _elements.length;
650 E old = (E)_elements[i];
652 int tail = _indexes[TAIL_OFFSET];
655 System.arraycopy(_elements,i + 1,_elements,i,tail - i);
656 --_indexes[TAIL_OFFSET];
660 System.arraycopy(_elements,i + 1,_elements,i,capacity - i - 1);
661 _elements[capacity - 1] = _elements[0];
664 System.arraycopy(_elements,1,_elements,0,tail);
665 --_indexes[TAIL_OFFSET];
669 _indexes[TAIL_OFFSET] = capacity - 1;
671 _elements[_indexes[TAIL_OFFSET]] = null;
674 _size.decrementAndGet();
690 public ListIterator<E> listIterator(int index)
700 Object[] elements = new Object[size()];
703 int head = _indexes[HEAD_OFFSET];
704 int tail = _indexes[TAIL_OFFSET];
707 System.arraycopy(_elements,head,elements,0,tail - head);
711 int chunk = _elements.length - head;
712 System.arraycopy(_elements,head,elements,0,chunk);
713 System.arraycopy(_elements,0,elements,chunk,tail);
716 return new Itr(elements,index);
729 /*----------------------------------------------------------------------------*/
730 /* Additional methods */
731 /*----------------------------------------------------------------------------*/
734 * @return the current capacity of this queue
736 public int getCapacity()
741 return _elements.length;
750 * @return the max capacity of this queue, or -1 if this queue is unbounded
752 public int getMaxCapacity()
757 /*----------------------------------------------------------------------------*/
758 /* Implementation methods */
759 /*----------------------------------------------------------------------------*/
761 private boolean grow()
763 if (_growCapacity <= 0)
773 final int head = _indexes[HEAD_OFFSET];
774 final int tail = _indexes[TAIL_OFFSET];
776 final int capacity = _elements.length;
778 Object[] elements = new Object[capacity + _growCapacity];
782 newTail = tail - head;
783 System.arraycopy(_elements,head,elements,0,newTail);
785 else if (head > tail || _size.get() > 0)
787 newTail = capacity + tail - head;
788 int cut = capacity - head;
789 System.arraycopy(_elements,head,elements,0,cut);
790 System.arraycopy(_elements,0,elements,cut,tail);
797 _elements = elements;
798 _indexes[HEAD_OFFSET] = 0;
799 _indexes[TAIL_OFFSET] = newTail;
813 private class Itr implements ListIterator<E>
815 private final Object[] _elements;
818 public Itr(Object[] elements, int offset)
820 _elements = elements;
825 public boolean hasNext()
827 return _cursor < _elements.length;
830 @SuppressWarnings("unchecked")
834 return (E)_elements[_cursor++];
838 public boolean hasPrevious()
843 @SuppressWarnings("unchecked")
847 return (E)_elements[--_cursor];
851 public int nextIndex()
857 public int previousIndex()
865 throw new UnsupportedOperationException();
871 throw new UnsupportedOperationException();
877 throw new UnsupportedOperationException();