--- /dev/null
+//
+// ========================================================================
+// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.util;
+
+import java.util.AbstractList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A BlockingQueue backed by a circular array capable or growing.
+ * <p/>
+ * This queue is uses a variant of the two lock queue algorithm to provide an efficient queue or list backed by a growable circular array.
+ * <p/>
+ * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is able to grow and provides a blocking put call.
+ * <p/>
+ * The queue has both a capacity (the size of the array currently allocated) and a max capacity (the maximum size that may be allocated), which defaults to
+ * {@link Integer#MAX_VALUE}.
+ *
+ * @param <E>
+ * The element type
+ */
+public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
+{
+ /**
+ * The head offset in the {@link #_indexes} array, displaced by 15 slots to avoid false sharing with the array length (stored before the first element of
+ * the array itself).
+ */
+ private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
+ /**
+ * The tail offset in the {@link #_indexes} array, displaced by 16 slots from the head to avoid false sharing with it.
+ */
+ private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine();
+ /**
+ * Default initial capacity, 128.
+ */
+ public static final int DEFAULT_CAPACITY = 128;
+ /**
+ * Default growth factor, 64.
+ */
+ public static final int DEFAULT_GROWTH = 64;
+
+ private final int _maxCapacity;
+ private final int _growCapacity;
+ /**
+ * Array that holds the head and tail indexes, separated by a cache line to avoid false sharing
+ */
+ private final int[] _indexes = new int[TAIL_OFFSET + 1];
+ private final Lock _tailLock = new ReentrantLock();
+ private final AtomicInteger _size = new AtomicInteger();
+ private final Lock _headLock = new ReentrantLock();
+ private final Condition _notEmpty = _headLock.newCondition();
+ private Object[] _elements;
+
+ /**
+ * Creates an unbounded {@link BlockingArrayQueue} with default initial capacity and grow factor.
+ *
+ * @see #DEFAULT_CAPACITY
+ * @see #DEFAULT_GROWTH
+ */
+ public BlockingArrayQueue()
+ {
+ _elements = new Object[DEFAULT_CAPACITY];
+ _growCapacity = DEFAULT_GROWTH;
+ _maxCapacity = Integer.MAX_VALUE;
+ }
+
+ /**
+ * Creates a bounded {@link BlockingArrayQueue} that does not grow. The capacity of the queue is fixed and equal to the given parameter.
+ *
+ * @param maxCapacity
+ * the maximum capacity
+ */
+ public BlockingArrayQueue(int maxCapacity)
+ {
+ _elements = new Object[maxCapacity];
+ _growCapacity = -1;
+ _maxCapacity = maxCapacity;
+ }
+
+ /**
+ * Creates an unbounded {@link BlockingArrayQueue} that grows by the given parameter.
+ *
+ * @param capacity
+ * the initial capacity
+ * @param growBy
+ * the growth factor
+ */
+ public BlockingArrayQueue(int capacity, int growBy)
+ {
+ _elements = new Object[capacity];
+ _growCapacity = growBy;
+ _maxCapacity = Integer.MAX_VALUE;
+ }
+
+ /**
+ * Create a bounded {@link BlockingArrayQueue} that grows by the given parameter.
+ *
+ * @param capacity
+ * the initial capacity
+ * @param growBy
+ * the growth factor
+ * @param maxCapacity
+ * the maximum capacity
+ */
+ public BlockingArrayQueue(int capacity, int growBy, int maxCapacity)
+ {
+ if (capacity > maxCapacity)
+ throw new IllegalArgumentException();
+ _elements = new Object[capacity];
+ _growCapacity = growBy;
+ _maxCapacity = maxCapacity;
+ }
+
+ /*----------------------------------------------------------------------------*/
+ /* Collection methods */
+ /*----------------------------------------------------------------------------*/
+
+ @Override
+ public void clear()
+ {
+
+ _tailLock.lock();
+ try
+ {
+
+ _headLock.lock();
+ try
+ {
+ _indexes[HEAD_OFFSET] = 0;
+ _indexes[TAIL_OFFSET] = 0;
+ _size.set(0);
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ }
+ finally
+ {
+ _tailLock.unlock();
+ }
+ }
+
+ @Override
+ public int size()
+ {
+ return _size.get();
+ }
+
+ @Override
+ public Iterator<E> iterator()
+ {
+ return listIterator();
+ }
+
+ /*----------------------------------------------------------------------------*/
+ /* Queue methods */
+ /*----------------------------------------------------------------------------*/
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public E poll()
+ {
+ if (_size.get() == 0)
+ return null;
+
+ E e = null;
+
+ _headLock.lock(); // Size cannot shrink
+ try
+ {
+ if (_size.get() > 0)
+ {
+ final int head = _indexes[HEAD_OFFSET];
+ e = (E)_elements[head];
+ _elements[head] = null;
+ _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
+ if (_size.decrementAndGet() > 0)
+ _notEmpty.signal();
+ }
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ return e;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public E peek()
+ {
+ if (_size.get() == 0)
+ return null;
+
+ E e = null;
+
+ _headLock.lock(); // Size cannot shrink
+ try
+ {
+ if (_size.get() > 0)
+ e = (E)_elements[_indexes[HEAD_OFFSET]];
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ return e;
+ }
+
+ @Override
+ public E remove()
+ {
+ E e = poll();
+ if (e == null)
+ throw new NoSuchElementException();
+ return e;
+ }
+
+ @Override
+ public E element()
+ {
+ E e = peek();
+ if (e == null)
+ throw new NoSuchElementException();
+ return e;
+ }
+
+ /*----------------------------------------------------------------------------*/
+ /* BlockingQueue methods */
+ /*----------------------------------------------------------------------------*/
+
+ @Override
+ public boolean offer(E e)
+ {
+ Objects.requireNonNull(e);
+
+ boolean notEmpty = false;
+ _tailLock.lock(); // Size cannot grow... only shrink
+ try
+ {
+ int size = _size.get();
+ if (size >= _maxCapacity)
+ return false;
+
+ // Should we expand array?
+ if (size == _elements.length)
+ {
+ _headLock.lock();
+ try
+ {
+ if (!grow())
+ return false;
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ }
+
+ // Re-read head and tail after a possible grow
+ int tail = _indexes[TAIL_OFFSET];
+ _elements[tail] = e;
+ _indexes[TAIL_OFFSET] = (tail + 1) % _elements.length;
+ notEmpty = _size.getAndIncrement() == 0;
+ }
+ finally
+ {
+ _tailLock.unlock();
+ }
+
+ if (notEmpty)
+ {
+ _headLock.lock();
+ try
+ {
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean add(E e)
+ {
+ if (offer(e))
+ return true;
+ else
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void put(E o) throws InterruptedException
+ {
+ // The mechanism to await and signal when the queue is full is not implemented
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
+ {
+ // The mechanism to await and signal when the queue is full is not implemented
+ throw new UnsupportedOperationException();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public E take() throws InterruptedException
+ {
+ E e = null;
+
+ _headLock.lockInterruptibly(); // Size cannot shrink
+ try
+ {
+ try
+ {
+ while (_size.get() == 0)
+ {
+ _notEmpty.await();
+ }
+ }
+ catch (InterruptedException ie)
+ {
+ _notEmpty.signal();
+ throw ie;
+ }
+
+ final int head = _indexes[HEAD_OFFSET];
+ e = (E)_elements[head];
+ _elements[head] = null;
+ _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
+
+ if (_size.decrementAndGet() > 0)
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ return e;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public E poll(long time, TimeUnit unit) throws InterruptedException
+ {
+ long nanos = unit.toNanos(time);
+ E e = null;
+
+ _headLock.lockInterruptibly(); // Size cannot shrink
+ try
+ {
+ try
+ {
+ while (_size.get() == 0)
+ {
+ if (nanos <= 0)
+ return null;
+ nanos = _notEmpty.awaitNanos(nanos);
+ }
+ }
+ catch (InterruptedException x)
+ {
+ _notEmpty.signal();
+ throw x;
+ }
+
+ int head = _indexes[HEAD_OFFSET];
+ e = (E)_elements[head];
+ _elements[head] = null;
+ _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
+
+ if (_size.decrementAndGet() > 0)
+ _notEmpty.signal();
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ return e;
+ }
+
+ @Override
+ public boolean remove(Object o)
+ {
+
+ _tailLock.lock();
+ try
+ {
+
+ _headLock.lock();
+ try
+ {
+ if (isEmpty())
+ return false;
+
+ final int head = _indexes[HEAD_OFFSET];
+ final int tail = _indexes[TAIL_OFFSET];
+ final int capacity = _elements.length;
+
+ int i = head;
+ while (true)
+ {
+ if (Objects.equals(_elements[i],o))
+ {
+ remove(i >= head?i - head:capacity - head + i);
+ return true;
+ }
+ ++i;
+ if (i == capacity)
+ i = 0;
+ if (i == tail)
+ return false;
+ }
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ }
+ finally
+ {
+ _tailLock.unlock();
+ }
+ }
+
+ @Override
+ public int remainingCapacity()
+ {
+
+ _tailLock.lock();
+ try
+ {
+
+ _headLock.lock();
+ try
+ {
+ return getCapacity() - size();
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ }
+ finally
+ {
+ _tailLock.unlock();
+ }
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c, int maxElements)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /*----------------------------------------------------------------------------*/
+ /* List methods */
+ /*----------------------------------------------------------------------------*/
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public E get(int index)
+ {
+
+ _tailLock.lock();
+ try
+ {
+
+ _headLock.lock();
+ try
+ {
+ if (index < 0 || index >= _size.get())
+ throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
+ int i = _indexes[HEAD_OFFSET] + index;
+ int capacity = _elements.length;
+ if (i >= capacity)
+ i -= capacity;
+ return (E)_elements[i];
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ }
+ finally
+ {
+ _tailLock.unlock();
+ }
+ }
+
+ @Override
+ public void add(int index, E e)
+ {
+ if (e == null)
+ throw new NullPointerException();
+
+ _tailLock.lock();
+ try
+ {
+
+ _headLock.lock();
+ try
+ {
+ final int size = _size.get();
+
+ if (index < 0 || index > size)
+ throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
+
+ if (index == size)
+ {
+ add(e);
+ }
+ else
+ {
+ if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET])
+ if (!grow())
+ throw new IllegalStateException("full");
+
+ // Re-read head and tail after a possible grow
+ int i = _indexes[HEAD_OFFSET] + index;
+ int capacity = _elements.length;
+
+ if (i >= capacity)
+ i -= capacity;
+
+ _size.incrementAndGet();
+ int tail = _indexes[TAIL_OFFSET];
+ _indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity;
+
+ if (i < tail)
+ {
+ System.arraycopy(_elements,i,_elements,i + 1,tail - i);
+ _elements[i] = e;
+ }
+ else
+ {
+ if (tail > 0)
+ {
+ System.arraycopy(_elements,0,_elements,1,tail);
+ _elements[0] = _elements[capacity - 1];
+ }
+
+ System.arraycopy(_elements,i,_elements,i + 1,capacity - i - 1);
+ _elements[i] = e;
+ }
+ }
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ }
+ finally
+ {
+ _tailLock.unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public E set(int index, E e)
+ {
+ Objects.requireNonNull(e);
+
+ _tailLock.lock();
+ try
+ {
+
+ _headLock.lock();
+ try
+ {
+ if (index < 0 || index >= _size.get())
+ throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
+
+ int i = _indexes[HEAD_OFFSET] + index;
+ int capacity = _elements.length;
+ if (i >= capacity)
+ i -= capacity;
+ E old = (E)_elements[i];
+ _elements[i] = e;
+ return old;
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ }
+ finally
+ {
+ _tailLock.unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public E remove(int index)
+ {
+
+ _tailLock.lock();
+ try
+ {
+
+ _headLock.lock();
+ try
+ {
+ if (index < 0 || index >= _size.get())
+ throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
+
+ int i = _indexes[HEAD_OFFSET] + index;
+ int capacity = _elements.length;
+ if (i >= capacity)
+ i -= capacity;
+ E old = (E)_elements[i];
+
+ int tail = _indexes[TAIL_OFFSET];
+ if (i < tail)
+ {
+ System.arraycopy(_elements,i + 1,_elements,i,tail - i);
+ --_indexes[TAIL_OFFSET];
+ }
+ else
+ {
+ System.arraycopy(_elements,i + 1,_elements,i,capacity - i - 1);
+ _elements[capacity - 1] = _elements[0];
+ if (tail > 0)
+ {
+ System.arraycopy(_elements,1,_elements,0,tail);
+ --_indexes[TAIL_OFFSET];
+ }
+ else
+ {
+ _indexes[TAIL_OFFSET] = capacity - 1;
+ }
+ _elements[_indexes[TAIL_OFFSET]] = null;
+ }
+
+ _size.decrementAndGet();
+
+ return old;
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ }
+ finally
+ {
+ _tailLock.unlock();
+ }
+ }
+
+ @Override
+ public ListIterator<E> listIterator(int index)
+ {
+
+ _tailLock.lock();
+ try
+ {
+
+ _headLock.lock();
+ try
+ {
+ Object[] elements = new Object[size()];
+ if (size() > 0)
+ {
+ int head = _indexes[HEAD_OFFSET];
+ int tail = _indexes[TAIL_OFFSET];
+ if (head < tail)
+ {
+ System.arraycopy(_elements,head,elements,0,tail - head);
+ }
+ else
+ {
+ int chunk = _elements.length - head;
+ System.arraycopy(_elements,head,elements,0,chunk);
+ System.arraycopy(_elements,0,elements,chunk,tail);
+ }
+ }
+ return new Itr(elements,index);
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ }
+ finally
+ {
+ _tailLock.unlock();
+ }
+ }
+
+ /*----------------------------------------------------------------------------*/
+ /* Additional methods */
+ /*----------------------------------------------------------------------------*/
+
+ /**
+ * @return the current capacity of this queue
+ */
+ public int getCapacity()
+ {
+ _tailLock.lock();
+ try
+ {
+ return _elements.length;
+ }
+ finally
+ {
+ _tailLock.unlock();
+ }
+ }
+
+ /**
+ * @return the max capacity of this queue, or -1 if this queue is unbounded
+ */
+ public int getMaxCapacity()
+ {
+ return _maxCapacity;
+ }
+
+ /*----------------------------------------------------------------------------*/
+ /* Implementation methods */
+ /*----------------------------------------------------------------------------*/
+
+ private boolean grow()
+ {
+ if (_growCapacity <= 0)
+ return false;
+
+ _tailLock.lock();
+ try
+ {
+
+ _headLock.lock();
+ try
+ {
+ final int head = _indexes[HEAD_OFFSET];
+ final int tail = _indexes[TAIL_OFFSET];
+ final int newTail;
+ final int capacity = _elements.length;
+
+ Object[] elements = new Object[capacity + _growCapacity];
+
+ if (head < tail)
+ {
+ newTail = tail - head;
+ System.arraycopy(_elements,head,elements,0,newTail);
+ }
+ else if (head > tail || _size.get() > 0)
+ {
+ newTail = capacity + tail - head;
+ int cut = capacity - head;
+ System.arraycopy(_elements,head,elements,0,cut);
+ System.arraycopy(_elements,0,elements,cut,tail);
+ }
+ else
+ {
+ newTail = 0;
+ }
+
+ _elements = elements;
+ _indexes[HEAD_OFFSET] = 0;
+ _indexes[TAIL_OFFSET] = newTail;
+ return true;
+ }
+ finally
+ {
+ _headLock.unlock();
+ }
+ }
+ finally
+ {
+ _tailLock.unlock();
+ }
+ }
+
+ private class Itr implements ListIterator<E>
+ {
+ private final Object[] _elements;
+ private int _cursor;
+
+ public Itr(Object[] elements, int offset)
+ {
+ _elements = elements;
+ _cursor = offset;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return _cursor < _elements.length;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public E next()
+ {
+ return (E)_elements[_cursor++];
+ }
+
+ @Override
+ public boolean hasPrevious()
+ {
+ return _cursor > 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public E previous()
+ {
+ return (E)_elements[--_cursor];
+ }
+
+ @Override
+ public int nextIndex()
+ {
+ return _cursor + 1;
+ }
+
+ @Override
+ public int previousIndex()
+ {
+ return _cursor - 1;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void set(E e)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void add(E e)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+}