]> WPIA git - gigi.git/blobdiff - lib/jetty/org/eclipse/jetty/util/BlockingArrayQueue.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / util / BlockingArrayQueue.java
diff --git a/lib/jetty/org/eclipse/jetty/util/BlockingArrayQueue.java b/lib/jetty/org/eclipse/jetty/util/BlockingArrayQueue.java
new file mode 100644 (file)
index 0000000..7acbdaf
--- /dev/null
@@ -0,0 +1,880 @@
+//
+//  ========================================================================
+//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  ------------------------------------------------------------------------
+//  All rights reserved. This program and the accompanying materials
+//  are made available under the terms of the Eclipse Public License v1.0
+//  and Apache License v2.0 which accompanies this distribution.
+//
+//      The Eclipse Public License is available at
+//      http://www.eclipse.org/legal/epl-v10.html
+//
+//      The Apache License v2.0 is available at
+//      http://www.opensource.org/licenses/apache2.0.php
+//
+//  You may elect to redistribute this code under either of these licenses.
+//  ========================================================================
+//
+
+package org.eclipse.jetty.util;
+
+import java.util.AbstractList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A BlockingQueue backed by a circular array capable or growing.
+ * <p/>
+ * This queue is uses a variant of the two lock queue algorithm to provide an efficient queue or list backed by a growable circular array.
+ * <p/>
+ * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is able to grow and provides a blocking put call.
+ * <p/>
+ * The queue has both a capacity (the size of the array currently allocated) and a max capacity (the maximum size that may be allocated), which defaults to
+ * {@link Integer#MAX_VALUE}.
+ * 
+ * @param <E>
+ *            The element type
+ */
+public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
+{
+    /**
+     * The head offset in the {@link #_indexes} array, displaced by 15 slots to avoid false sharing with the array length (stored before the first element of
+     * the array itself).
+     */
+    private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
+    /**
+     * The tail offset in the {@link #_indexes} array, displaced by 16 slots from the head to avoid false sharing with it.
+     */
+    private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine();
+    /**
+     * Default initial capacity, 128.
+     */
+    public static final int DEFAULT_CAPACITY = 128;
+    /**
+     * Default growth factor, 64.
+     */
+    public static final int DEFAULT_GROWTH = 64;
+
+    private final int _maxCapacity;
+    private final int _growCapacity;
+    /**
+     * Array that holds the head and tail indexes, separated by a cache line to avoid false sharing
+     */
+    private final int[] _indexes = new int[TAIL_OFFSET + 1];
+    private final Lock _tailLock = new ReentrantLock();
+    private final AtomicInteger _size = new AtomicInteger();
+    private final Lock _headLock = new ReentrantLock();
+    private final Condition _notEmpty = _headLock.newCondition();
+    private Object[] _elements;
+
+    /**
+     * Creates an unbounded {@link BlockingArrayQueue} with default initial capacity and grow factor.
+     * 
+     * @see #DEFAULT_CAPACITY
+     * @see #DEFAULT_GROWTH
+     */
+    public BlockingArrayQueue()
+    {
+        _elements = new Object[DEFAULT_CAPACITY];
+        _growCapacity = DEFAULT_GROWTH;
+        _maxCapacity = Integer.MAX_VALUE;
+    }
+
+    /**
+     * Creates a bounded {@link BlockingArrayQueue} that does not grow. The capacity of the queue is fixed and equal to the given parameter.
+     * 
+     * @param maxCapacity
+     *            the maximum capacity
+     */
+    public BlockingArrayQueue(int maxCapacity)
+    {
+        _elements = new Object[maxCapacity];
+        _growCapacity = -1;
+        _maxCapacity = maxCapacity;
+    }
+
+    /**
+     * Creates an unbounded {@link BlockingArrayQueue} that grows by the given parameter.
+     * 
+     * @param capacity
+     *            the initial capacity
+     * @param growBy
+     *            the growth factor
+     */
+    public BlockingArrayQueue(int capacity, int growBy)
+    {
+        _elements = new Object[capacity];
+        _growCapacity = growBy;
+        _maxCapacity = Integer.MAX_VALUE;
+    }
+
+    /**
+     * Create a bounded {@link BlockingArrayQueue} that grows by the given parameter.
+     * 
+     * @param capacity
+     *            the initial capacity
+     * @param growBy
+     *            the growth factor
+     * @param maxCapacity
+     *            the maximum capacity
+     */
+    public BlockingArrayQueue(int capacity, int growBy, int maxCapacity)
+    {
+        if (capacity > maxCapacity)
+            throw new IllegalArgumentException();
+        _elements = new Object[capacity];
+        _growCapacity = growBy;
+        _maxCapacity = maxCapacity;
+    }
+
+    /*----------------------------------------------------------------------------*/
+    /* Collection methods */
+    /*----------------------------------------------------------------------------*/
+
+    @Override
+    public void clear()
+    {
+
+        _tailLock.lock();
+        try
+        {
+
+            _headLock.lock();
+            try
+            {
+                _indexes[HEAD_OFFSET] = 0;
+                _indexes[TAIL_OFFSET] = 0;
+                _size.set(0);
+            }
+            finally
+            {
+                _headLock.unlock();
+            }
+        }
+        finally
+        {
+            _tailLock.unlock();
+        }
+    }
+
+    @Override
+    public int size()
+    {
+        return _size.get();
+    }
+
+    @Override
+    public Iterator<E> iterator()
+    {
+        return listIterator();
+    }
+
+    /*----------------------------------------------------------------------------*/
+    /* Queue methods */
+    /*----------------------------------------------------------------------------*/
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public E poll()
+    {
+        if (_size.get() == 0)
+            return null;
+
+        E e = null;
+
+        _headLock.lock(); // Size cannot shrink
+        try
+        {
+            if (_size.get() > 0)
+            {
+                final int head = _indexes[HEAD_OFFSET];
+                e = (E)_elements[head];
+                _elements[head] = null;
+                _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
+                if (_size.decrementAndGet() > 0)
+                    _notEmpty.signal();
+            }
+        }
+        finally
+        {
+            _headLock.unlock();
+        }
+        return e;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public E peek()
+    {
+        if (_size.get() == 0)
+            return null;
+
+        E e = null;
+
+        _headLock.lock(); // Size cannot shrink
+        try
+        {
+            if (_size.get() > 0)
+                e = (E)_elements[_indexes[HEAD_OFFSET]];
+        }
+        finally
+        {
+            _headLock.unlock();
+        }
+        return e;
+    }
+
+    @Override
+    public E remove()
+    {
+        E e = poll();
+        if (e == null)
+            throw new NoSuchElementException();
+        return e;
+    }
+
+    @Override
+    public E element()
+    {
+        E e = peek();
+        if (e == null)
+            throw new NoSuchElementException();
+        return e;
+    }
+
+    /*----------------------------------------------------------------------------*/
+    /* BlockingQueue methods */
+    /*----------------------------------------------------------------------------*/
+
+    @Override
+    public boolean offer(E e)
+    {
+        Objects.requireNonNull(e);
+
+        boolean notEmpty = false;
+        _tailLock.lock(); // Size cannot grow... only shrink
+        try
+        {
+            int size = _size.get();
+            if (size >= _maxCapacity)
+                return false;
+
+            // Should we expand array?
+            if (size == _elements.length)
+            {
+                _headLock.lock();
+                try
+                {
+                    if (!grow())
+                        return false;
+                }
+                finally
+                {
+                    _headLock.unlock();
+                }
+            }
+
+            // Re-read head and tail after a possible grow
+            int tail = _indexes[TAIL_OFFSET];
+            _elements[tail] = e;
+            _indexes[TAIL_OFFSET] = (tail + 1) % _elements.length;
+            notEmpty = _size.getAndIncrement() == 0;
+        }
+        finally
+        {
+            _tailLock.unlock();
+        }
+
+        if (notEmpty)
+        {
+            _headLock.lock();
+            try
+            {
+                _notEmpty.signal();
+            }
+            finally
+            {
+                _headLock.unlock();
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public boolean add(E e)
+    {
+        if (offer(e))
+            return true;
+        else
+            throw new IllegalStateException();
+    }
+
+    @Override
+    public void put(E o) throws InterruptedException
+    {
+        // The mechanism to await and signal when the queue is full is not implemented
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
+    {
+        // The mechanism to await and signal when the queue is full is not implemented
+        throw new UnsupportedOperationException();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public E take() throws InterruptedException
+    {
+        E e = null;
+
+        _headLock.lockInterruptibly(); // Size cannot shrink
+        try
+        {
+            try
+            {
+                while (_size.get() == 0)
+                {
+                    _notEmpty.await();
+                }
+            }
+            catch (InterruptedException ie)
+            {
+                _notEmpty.signal();
+                throw ie;
+            }
+
+            final int head = _indexes[HEAD_OFFSET];
+            e = (E)_elements[head];
+            _elements[head] = null;
+            _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
+
+            if (_size.decrementAndGet() > 0)
+                _notEmpty.signal();
+        }
+        finally
+        {
+            _headLock.unlock();
+        }
+        return e;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public E poll(long time, TimeUnit unit) throws InterruptedException
+    {
+        long nanos = unit.toNanos(time);
+        E e = null;
+
+        _headLock.lockInterruptibly(); // Size cannot shrink
+        try
+        {
+            try
+            {
+                while (_size.get() == 0)
+                {
+                    if (nanos <= 0)
+                        return null;
+                    nanos = _notEmpty.awaitNanos(nanos);
+                }
+            }
+            catch (InterruptedException x)
+            {
+                _notEmpty.signal();
+                throw x;
+            }
+
+            int head = _indexes[HEAD_OFFSET];
+            e = (E)_elements[head];
+            _elements[head] = null;
+            _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
+
+            if (_size.decrementAndGet() > 0)
+                _notEmpty.signal();
+        }
+        finally
+        {
+            _headLock.unlock();
+        }
+        return e;
+    }
+
+    @Override
+    public boolean remove(Object o)
+    {
+
+        _tailLock.lock();
+        try
+        {
+
+            _headLock.lock();
+            try
+            {
+                if (isEmpty())
+                    return false;
+
+                final int head = _indexes[HEAD_OFFSET];
+                final int tail = _indexes[TAIL_OFFSET];
+                final int capacity = _elements.length;
+
+                int i = head;
+                while (true)
+                {
+                    if (Objects.equals(_elements[i],o))
+                    {
+                        remove(i >= head?i - head:capacity - head + i);
+                        return true;
+                    }
+                    ++i;
+                    if (i == capacity)
+                        i = 0;
+                    if (i == tail)
+                        return false;
+                }
+            }
+            finally
+            {
+                _headLock.unlock();
+            }
+        }
+        finally
+        {
+            _tailLock.unlock();
+        }
+    }
+
+    @Override
+    public int remainingCapacity()
+    {
+
+        _tailLock.lock();
+        try
+        {
+
+            _headLock.lock();
+            try
+            {
+                return getCapacity() - size();
+            }
+            finally
+            {
+                _headLock.unlock();
+            }
+        }
+        finally
+        {
+            _tailLock.unlock();
+        }
+    }
+
+    @Override
+    public int drainTo(Collection<? super E> c)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int drainTo(Collection<? super E> c, int maxElements)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    /*----------------------------------------------------------------------------*/
+    /* List methods */
+    /*----------------------------------------------------------------------------*/
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public E get(int index)
+    {
+
+        _tailLock.lock();
+        try
+        {
+
+            _headLock.lock();
+            try
+            {
+                if (index < 0 || index >= _size.get())
+                    throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
+                int i = _indexes[HEAD_OFFSET] + index;
+                int capacity = _elements.length;
+                if (i >= capacity)
+                    i -= capacity;
+                return (E)_elements[i];
+            }
+            finally
+            {
+                _headLock.unlock();
+            }
+        }
+        finally
+        {
+            _tailLock.unlock();
+        }
+    }
+
+    @Override
+    public void add(int index, E e)
+    {
+        if (e == null)
+            throw new NullPointerException();
+
+        _tailLock.lock();
+        try
+        {
+
+            _headLock.lock();
+            try
+            {
+                final int size = _size.get();
+
+                if (index < 0 || index > size)
+                    throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
+
+                if (index == size)
+                {
+                    add(e);
+                }
+                else
+                {
+                    if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET])
+                        if (!grow())
+                            throw new IllegalStateException("full");
+
+                    // Re-read head and tail after a possible grow
+                    int i = _indexes[HEAD_OFFSET] + index;
+                    int capacity = _elements.length;
+
+                    if (i >= capacity)
+                        i -= capacity;
+
+                    _size.incrementAndGet();
+                    int tail = _indexes[TAIL_OFFSET];
+                    _indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity;
+
+                    if (i < tail)
+                    {
+                        System.arraycopy(_elements,i,_elements,i + 1,tail - i);
+                        _elements[i] = e;
+                    }
+                    else
+                    {
+                        if (tail > 0)
+                        {
+                            System.arraycopy(_elements,0,_elements,1,tail);
+                            _elements[0] = _elements[capacity - 1];
+                        }
+
+                        System.arraycopy(_elements,i,_elements,i + 1,capacity - i - 1);
+                        _elements[i] = e;
+                    }
+                }
+            }
+            finally
+            {
+                _headLock.unlock();
+            }
+        }
+        finally
+        {
+            _tailLock.unlock();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public E set(int index, E e)
+    {
+        Objects.requireNonNull(e);
+
+        _tailLock.lock();
+        try
+        {
+
+            _headLock.lock();
+            try
+            {
+                if (index < 0 || index >= _size.get())
+                    throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
+
+                int i = _indexes[HEAD_OFFSET] + index;
+                int capacity = _elements.length;
+                if (i >= capacity)
+                    i -= capacity;
+                E old = (E)_elements[i];
+                _elements[i] = e;
+                return old;
+            }
+            finally
+            {
+                _headLock.unlock();
+            }
+        }
+        finally
+        {
+            _tailLock.unlock();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public E remove(int index)
+    {
+
+        _tailLock.lock();
+        try
+        {
+
+            _headLock.lock();
+            try
+            {
+                if (index < 0 || index >= _size.get())
+                    throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
+
+                int i = _indexes[HEAD_OFFSET] + index;
+                int capacity = _elements.length;
+                if (i >= capacity)
+                    i -= capacity;
+                E old = (E)_elements[i];
+
+                int tail = _indexes[TAIL_OFFSET];
+                if (i < tail)
+                {
+                    System.arraycopy(_elements,i + 1,_elements,i,tail - i);
+                    --_indexes[TAIL_OFFSET];
+                }
+                else
+                {
+                    System.arraycopy(_elements,i + 1,_elements,i,capacity - i - 1);
+                    _elements[capacity - 1] = _elements[0];
+                    if (tail > 0)
+                    {
+                        System.arraycopy(_elements,1,_elements,0,tail);
+                        --_indexes[TAIL_OFFSET];
+                    }
+                    else
+                    {
+                        _indexes[TAIL_OFFSET] = capacity - 1;
+                    }
+                    _elements[_indexes[TAIL_OFFSET]] = null;
+                }
+
+                _size.decrementAndGet();
+
+                return old;
+            }
+            finally
+            {
+                _headLock.unlock();
+            }
+        }
+        finally
+        {
+            _tailLock.unlock();
+        }
+    }
+
+    @Override
+    public ListIterator<E> listIterator(int index)
+    {
+
+        _tailLock.lock();
+        try
+        {
+
+            _headLock.lock();
+            try
+            {
+                Object[] elements = new Object[size()];
+                if (size() > 0)
+                {
+                    int head = _indexes[HEAD_OFFSET];
+                    int tail = _indexes[TAIL_OFFSET];
+                    if (head < tail)
+                    {
+                        System.arraycopy(_elements,head,elements,0,tail - head);
+                    }
+                    else
+                    {
+                        int chunk = _elements.length - head;
+                        System.arraycopy(_elements,head,elements,0,chunk);
+                        System.arraycopy(_elements,0,elements,chunk,tail);
+                    }
+                }
+                return new Itr(elements,index);
+            }
+            finally
+            {
+                _headLock.unlock();
+            }
+        }
+        finally
+        {
+            _tailLock.unlock();
+        }
+    }
+
+    /*----------------------------------------------------------------------------*/
+    /* Additional methods */
+    /*----------------------------------------------------------------------------*/
+
+    /**
+     * @return the current capacity of this queue
+     */
+    public int getCapacity()
+    {
+        _tailLock.lock();
+        try
+        {
+            return _elements.length;
+        }
+        finally
+        {
+            _tailLock.unlock();
+        }
+    }
+
+    /**
+     * @return the max capacity of this queue, or -1 if this queue is unbounded
+     */
+    public int getMaxCapacity()
+    {
+        return _maxCapacity;
+    }
+
+    /*----------------------------------------------------------------------------*/
+    /* Implementation methods */
+    /*----------------------------------------------------------------------------*/
+
+    private boolean grow()
+    {
+        if (_growCapacity <= 0)
+            return false;
+
+        _tailLock.lock();
+        try
+        {
+
+            _headLock.lock();
+            try
+            {
+                final int head = _indexes[HEAD_OFFSET];
+                final int tail = _indexes[TAIL_OFFSET];
+                final int newTail;
+                final int capacity = _elements.length;
+
+                Object[] elements = new Object[capacity + _growCapacity];
+
+                if (head < tail)
+                {
+                    newTail = tail - head;
+                    System.arraycopy(_elements,head,elements,0,newTail);
+                }
+                else if (head > tail || _size.get() > 0)
+                {
+                    newTail = capacity + tail - head;
+                    int cut = capacity - head;
+                    System.arraycopy(_elements,head,elements,0,cut);
+                    System.arraycopy(_elements,0,elements,cut,tail);
+                }
+                else
+                {
+                    newTail = 0;
+                }
+
+                _elements = elements;
+                _indexes[HEAD_OFFSET] = 0;
+                _indexes[TAIL_OFFSET] = newTail;
+                return true;
+            }
+            finally
+            {
+                _headLock.unlock();
+            }
+        }
+        finally
+        {
+            _tailLock.unlock();
+        }
+    }
+
+    private class Itr implements ListIterator<E>
+    {
+        private final Object[] _elements;
+        private int _cursor;
+
+        public Itr(Object[] elements, int offset)
+        {
+            _elements = elements;
+            _cursor = offset;
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return _cursor < _elements.length;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public E next()
+        {
+            return (E)_elements[_cursor++];
+        }
+
+        @Override
+        public boolean hasPrevious()
+        {
+            return _cursor > 0;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public E previous()
+        {
+            return (E)_elements[--_cursor];
+        }
+
+        @Override
+        public int nextIndex()
+        {
+            return _cursor + 1;
+        }
+
+        @Override
+        public int previousIndex()
+        {
+            return _cursor - 1;
+        }
+
+        @Override
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void set(E e)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void add(E e)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+}