X-Git-Url: https://code.wpia.club/?p=gigi.git;a=blobdiff_plain;f=lib%2Fjetty%2Forg%2Feclipse%2Fjetty%2Futil%2FBlockingArrayQueue.java;fp=lib%2Fjetty%2Forg%2Feclipse%2Fjetty%2Futil%2FBlockingArrayQueue.java;h=7acbdafcdd2648ca3604b69fe8a21edaadf07712;hp=0000000000000000000000000000000000000000;hb=73ef54a38e3930a1a789cdc6b5fa23cdd4c9d086;hpb=515007c7c1351045420669d65b59c08fa46850f2 diff --git a/lib/jetty/org/eclipse/jetty/util/BlockingArrayQueue.java b/lib/jetty/org/eclipse/jetty/util/BlockingArrayQueue.java new file mode 100644 index 00000000..7acbdafc --- /dev/null +++ b/lib/jetty/org/eclipse/jetty/util/BlockingArrayQueue.java @@ -0,0 +1,880 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.util.AbstractList; +import java.util.Collection; +import java.util.Iterator; +import java.util.ListIterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A BlockingQueue backed by a circular array capable or growing. + *

+ * This queue is uses a variant of the two lock queue algorithm to provide an efficient queue or list backed by a growable circular array. + *

+ * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is able to grow and provides a blocking put call. + *

+ * The queue has both a capacity (the size of the array currently allocated) and a max capacity (the maximum size that may be allocated), which defaults to + * {@link Integer#MAX_VALUE}. + * + * @param + * The element type + */ +public class BlockingArrayQueue extends AbstractList implements BlockingQueue +{ + /** + * The head offset in the {@link #_indexes} array, displaced by 15 slots to avoid false sharing with the array length (stored before the first element of + * the array itself). + */ + private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1; + /** + * The tail offset in the {@link #_indexes} array, displaced by 16 slots from the head to avoid false sharing with it. + */ + private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine(); + /** + * Default initial capacity, 128. + */ + public static final int DEFAULT_CAPACITY = 128; + /** + * Default growth factor, 64. + */ + public static final int DEFAULT_GROWTH = 64; + + private final int _maxCapacity; + private final int _growCapacity; + /** + * Array that holds the head and tail indexes, separated by a cache line to avoid false sharing + */ + private final int[] _indexes = new int[TAIL_OFFSET + 1]; + private final Lock _tailLock = new ReentrantLock(); + private final AtomicInteger _size = new AtomicInteger(); + private final Lock _headLock = new ReentrantLock(); + private final Condition _notEmpty = _headLock.newCondition(); + private Object[] _elements; + + /** + * Creates an unbounded {@link BlockingArrayQueue} with default initial capacity and grow factor. + * + * @see #DEFAULT_CAPACITY + * @see #DEFAULT_GROWTH + */ + public BlockingArrayQueue() + { + _elements = new Object[DEFAULT_CAPACITY]; + _growCapacity = DEFAULT_GROWTH; + _maxCapacity = Integer.MAX_VALUE; + } + + /** + * Creates a bounded {@link BlockingArrayQueue} that does not grow. The capacity of the queue is fixed and equal to the given parameter. + * + * @param maxCapacity + * the maximum capacity + */ + public BlockingArrayQueue(int maxCapacity) + { + _elements = new Object[maxCapacity]; + _growCapacity = -1; + _maxCapacity = maxCapacity; + } + + /** + * Creates an unbounded {@link BlockingArrayQueue} that grows by the given parameter. + * + * @param capacity + * the initial capacity + * @param growBy + * the growth factor + */ + public BlockingArrayQueue(int capacity, int growBy) + { + _elements = new Object[capacity]; + _growCapacity = growBy; + _maxCapacity = Integer.MAX_VALUE; + } + + /** + * Create a bounded {@link BlockingArrayQueue} that grows by the given parameter. + * + * @param capacity + * the initial capacity + * @param growBy + * the growth factor + * @param maxCapacity + * the maximum capacity + */ + public BlockingArrayQueue(int capacity, int growBy, int maxCapacity) + { + if (capacity > maxCapacity) + throw new IllegalArgumentException(); + _elements = new Object[capacity]; + _growCapacity = growBy; + _maxCapacity = maxCapacity; + } + + /*----------------------------------------------------------------------------*/ + /* Collection methods */ + /*----------------------------------------------------------------------------*/ + + @Override + public void clear() + { + + _tailLock.lock(); + try + { + + _headLock.lock(); + try + { + _indexes[HEAD_OFFSET] = 0; + _indexes[TAIL_OFFSET] = 0; + _size.set(0); + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + @Override + public int size() + { + return _size.get(); + } + + @Override + public Iterator iterator() + { + return listIterator(); + } + + /*----------------------------------------------------------------------------*/ + /* Queue methods */ + /*----------------------------------------------------------------------------*/ + + @SuppressWarnings("unchecked") + @Override + public E poll() + { + if (_size.get() == 0) + return null; + + E e = null; + + _headLock.lock(); // Size cannot shrink + try + { + if (_size.get() > 0) + { + final int head = _indexes[HEAD_OFFSET]; + e = (E)_elements[head]; + _elements[head] = null; + _indexes[HEAD_OFFSET] = (head + 1) % _elements.length; + if (_size.decrementAndGet() > 0) + _notEmpty.signal(); + } + } + finally + { + _headLock.unlock(); + } + return e; + } + + @SuppressWarnings("unchecked") + @Override + public E peek() + { + if (_size.get() == 0) + return null; + + E e = null; + + _headLock.lock(); // Size cannot shrink + try + { + if (_size.get() > 0) + e = (E)_elements[_indexes[HEAD_OFFSET]]; + } + finally + { + _headLock.unlock(); + } + return e; + } + + @Override + public E remove() + { + E e = poll(); + if (e == null) + throw new NoSuchElementException(); + return e; + } + + @Override + public E element() + { + E e = peek(); + if (e == null) + throw new NoSuchElementException(); + return e; + } + + /*----------------------------------------------------------------------------*/ + /* BlockingQueue methods */ + /*----------------------------------------------------------------------------*/ + + @Override + public boolean offer(E e) + { + Objects.requireNonNull(e); + + boolean notEmpty = false; + _tailLock.lock(); // Size cannot grow... only shrink + try + { + int size = _size.get(); + if (size >= _maxCapacity) + return false; + + // Should we expand array? + if (size == _elements.length) + { + _headLock.lock(); + try + { + if (!grow()) + return false; + } + finally + { + _headLock.unlock(); + } + } + + // Re-read head and tail after a possible grow + int tail = _indexes[TAIL_OFFSET]; + _elements[tail] = e; + _indexes[TAIL_OFFSET] = (tail + 1) % _elements.length; + notEmpty = _size.getAndIncrement() == 0; + } + finally + { + _tailLock.unlock(); + } + + if (notEmpty) + { + _headLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _headLock.unlock(); + } + } + + return true; + } + + @Override + public boolean add(E e) + { + if (offer(e)) + return true; + else + throw new IllegalStateException(); + } + + @Override + public void put(E o) throws InterruptedException + { + // The mechanism to await and signal when the queue is full is not implemented + throw new UnsupportedOperationException(); + } + + @Override + public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException + { + // The mechanism to await and signal when the queue is full is not implemented + throw new UnsupportedOperationException(); + } + + @SuppressWarnings("unchecked") + @Override + public E take() throws InterruptedException + { + E e = null; + + _headLock.lockInterruptibly(); // Size cannot shrink + try + { + try + { + while (_size.get() == 0) + { + _notEmpty.await(); + } + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + + final int head = _indexes[HEAD_OFFSET]; + e = (E)_elements[head]; + _elements[head] = null; + _indexes[HEAD_OFFSET] = (head + 1) % _elements.length; + + if (_size.decrementAndGet() > 0) + _notEmpty.signal(); + } + finally + { + _headLock.unlock(); + } + return e; + } + + @SuppressWarnings("unchecked") + @Override + public E poll(long time, TimeUnit unit) throws InterruptedException + { + long nanos = unit.toNanos(time); + E e = null; + + _headLock.lockInterruptibly(); // Size cannot shrink + try + { + try + { + while (_size.get() == 0) + { + if (nanos <= 0) + return null; + nanos = _notEmpty.awaitNanos(nanos); + } + } + catch (InterruptedException x) + { + _notEmpty.signal(); + throw x; + } + + int head = _indexes[HEAD_OFFSET]; + e = (E)_elements[head]; + _elements[head] = null; + _indexes[HEAD_OFFSET] = (head + 1) % _elements.length; + + if (_size.decrementAndGet() > 0) + _notEmpty.signal(); + } + finally + { + _headLock.unlock(); + } + return e; + } + + @Override + public boolean remove(Object o) + { + + _tailLock.lock(); + try + { + + _headLock.lock(); + try + { + if (isEmpty()) + return false; + + final int head = _indexes[HEAD_OFFSET]; + final int tail = _indexes[TAIL_OFFSET]; + final int capacity = _elements.length; + + int i = head; + while (true) + { + if (Objects.equals(_elements[i],o)) + { + remove(i >= head?i - head:capacity - head + i); + return true; + } + ++i; + if (i == capacity) + i = 0; + if (i == tail) + return false; + } + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + @Override + public int remainingCapacity() + { + + _tailLock.lock(); + try + { + + _headLock.lock(); + try + { + return getCapacity() - size(); + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + @Override + public int drainTo(Collection c) + { + throw new UnsupportedOperationException(); + } + + @Override + public int drainTo(Collection c, int maxElements) + { + throw new UnsupportedOperationException(); + } + + /*----------------------------------------------------------------------------*/ + /* List methods */ + /*----------------------------------------------------------------------------*/ + + @SuppressWarnings("unchecked") + @Override + public E get(int index) + { + + _tailLock.lock(); + try + { + + _headLock.lock(); + try + { + if (index < 0 || index >= _size.get()) + throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")"); + int i = _indexes[HEAD_OFFSET] + index; + int capacity = _elements.length; + if (i >= capacity) + i -= capacity; + return (E)_elements[i]; + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + @Override + public void add(int index, E e) + { + if (e == null) + throw new NullPointerException(); + + _tailLock.lock(); + try + { + + _headLock.lock(); + try + { + final int size = _size.get(); + + if (index < 0 || index > size) + throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")"); + + if (index == size) + { + add(e); + } + else + { + if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET]) + if (!grow()) + throw new IllegalStateException("full"); + + // Re-read head and tail after a possible grow + int i = _indexes[HEAD_OFFSET] + index; + int capacity = _elements.length; + + if (i >= capacity) + i -= capacity; + + _size.incrementAndGet(); + int tail = _indexes[TAIL_OFFSET]; + _indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity; + + if (i < tail) + { + System.arraycopy(_elements,i,_elements,i + 1,tail - i); + _elements[i] = e; + } + else + { + if (tail > 0) + { + System.arraycopy(_elements,0,_elements,1,tail); + _elements[0] = _elements[capacity - 1]; + } + + System.arraycopy(_elements,i,_elements,i + 1,capacity - i - 1); + _elements[i] = e; + } + } + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + @SuppressWarnings("unchecked") + @Override + public E set(int index, E e) + { + Objects.requireNonNull(e); + + _tailLock.lock(); + try + { + + _headLock.lock(); + try + { + if (index < 0 || index >= _size.get()) + throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")"); + + int i = _indexes[HEAD_OFFSET] + index; + int capacity = _elements.length; + if (i >= capacity) + i -= capacity; + E old = (E)_elements[i]; + _elements[i] = e; + return old; + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + @SuppressWarnings("unchecked") + @Override + public E remove(int index) + { + + _tailLock.lock(); + try + { + + _headLock.lock(); + try + { + if (index < 0 || index >= _size.get()) + throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")"); + + int i = _indexes[HEAD_OFFSET] + index; + int capacity = _elements.length; + if (i >= capacity) + i -= capacity; + E old = (E)_elements[i]; + + int tail = _indexes[TAIL_OFFSET]; + if (i < tail) + { + System.arraycopy(_elements,i + 1,_elements,i,tail - i); + --_indexes[TAIL_OFFSET]; + } + else + { + System.arraycopy(_elements,i + 1,_elements,i,capacity - i - 1); + _elements[capacity - 1] = _elements[0]; + if (tail > 0) + { + System.arraycopy(_elements,1,_elements,0,tail); + --_indexes[TAIL_OFFSET]; + } + else + { + _indexes[TAIL_OFFSET] = capacity - 1; + } + _elements[_indexes[TAIL_OFFSET]] = null; + } + + _size.decrementAndGet(); + + return old; + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + @Override + public ListIterator listIterator(int index) + { + + _tailLock.lock(); + try + { + + _headLock.lock(); + try + { + Object[] elements = new Object[size()]; + if (size() > 0) + { + int head = _indexes[HEAD_OFFSET]; + int tail = _indexes[TAIL_OFFSET]; + if (head < tail) + { + System.arraycopy(_elements,head,elements,0,tail - head); + } + else + { + int chunk = _elements.length - head; + System.arraycopy(_elements,head,elements,0,chunk); + System.arraycopy(_elements,0,elements,chunk,tail); + } + } + return new Itr(elements,index); + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + /*----------------------------------------------------------------------------*/ + /* Additional methods */ + /*----------------------------------------------------------------------------*/ + + /** + * @return the current capacity of this queue + */ + public int getCapacity() + { + _tailLock.lock(); + try + { + return _elements.length; + } + finally + { + _tailLock.unlock(); + } + } + + /** + * @return the max capacity of this queue, or -1 if this queue is unbounded + */ + public int getMaxCapacity() + { + return _maxCapacity; + } + + /*----------------------------------------------------------------------------*/ + /* Implementation methods */ + /*----------------------------------------------------------------------------*/ + + private boolean grow() + { + if (_growCapacity <= 0) + return false; + + _tailLock.lock(); + try + { + + _headLock.lock(); + try + { + final int head = _indexes[HEAD_OFFSET]; + final int tail = _indexes[TAIL_OFFSET]; + final int newTail; + final int capacity = _elements.length; + + Object[] elements = new Object[capacity + _growCapacity]; + + if (head < tail) + { + newTail = tail - head; + System.arraycopy(_elements,head,elements,0,newTail); + } + else if (head > tail || _size.get() > 0) + { + newTail = capacity + tail - head; + int cut = capacity - head; + System.arraycopy(_elements,head,elements,0,cut); + System.arraycopy(_elements,0,elements,cut,tail); + } + else + { + newTail = 0; + } + + _elements = elements; + _indexes[HEAD_OFFSET] = 0; + _indexes[TAIL_OFFSET] = newTail; + return true; + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + private class Itr implements ListIterator + { + private final Object[] _elements; + private int _cursor; + + public Itr(Object[] elements, int offset) + { + _elements = elements; + _cursor = offset; + } + + @Override + public boolean hasNext() + { + return _cursor < _elements.length; + } + + @SuppressWarnings("unchecked") + @Override + public E next() + { + return (E)_elements[_cursor++]; + } + + @Override + public boolean hasPrevious() + { + return _cursor > 0; + } + + @SuppressWarnings("unchecked") + @Override + public E previous() + { + return (E)_elements[--_cursor]; + } + + @Override + public int nextIndex() + { + return _cursor + 1; + } + + @Override + public int previousIndex() + { + return _cursor - 1; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + + @Override + public void set(E e) + { + throw new UnsupportedOperationException(); + } + + @Override + public void add(E e) + { + throw new UnsupportedOperationException(); + } + } +}