]> WPIA git - gigi.git/blobdiff - lib/jetty/org/eclipse/jetty/util/ConcurrentArrayQueue.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / util / ConcurrentArrayQueue.java
diff --git a/lib/jetty/org/eclipse/jetty/util/ConcurrentArrayQueue.java b/lib/jetty/org/eclipse/jetty/util/ConcurrentArrayQueue.java
new file mode 100644 (file)
index 0000000..fc1326d
--- /dev/null
@@ -0,0 +1,570 @@
+//
+//  ========================================================================
+//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  ------------------------------------------------------------------------
+//  All rights reserved. This program and the accompanying materials
+//  are made available under the terms of the Eclipse Public License v1.0
+//  and Apache License v2.0 which accompanies this distribution.
+//
+//      The Eclipse Public License is available at
+//      http://www.eclipse.org/legal/epl-v10.html
+//
+//      The Apache License v2.0 is available at
+//      http://www.opensource.org/licenses/apache2.0.php
+//
+//  You may elect to redistribute this code under either of these licenses.
+//  ========================================================================
+//
+
+package org.eclipse.jetty.util;
+
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+/**
+ * A concurrent, unbounded implementation of {@link Queue} that uses singly-linked array blocks
+ * to store elements.
+ * <p/>
+ * This class is a drop-in replacement for {@link ConcurrentLinkedQueue}, with similar performance
+ * but producing less garbage because arrays are used to store elements rather than nodes.
+ * <p/>
+ * The algorithm used is a variation of the algorithm from Gidenstam, Sundell and Tsigas
+ * (http://www.adm.hb.se/~AGD/Presentations/CacheAwareQueue_OPODIS.pdf).
+ *
+ * @param <T>
+ */
+public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
+{
+    public static final int DEFAULT_BLOCK_SIZE = 512;
+    public static final Object REMOVED_ELEMENT = new Object()
+    {
+        @Override
+        public String toString()
+        {
+            return "X";
+        }
+    };
+
+    private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
+    private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
+
+    private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
+    private final int _blockSize;
+
+    public ConcurrentArrayQueue()
+    {
+        this(DEFAULT_BLOCK_SIZE);
+    }
+
+    public ConcurrentArrayQueue(int blockSize)
+    {
+        _blockSize = blockSize;
+        Block<T> block = newBlock();
+        _blocks.set(HEAD_OFFSET,block);
+        _blocks.set(TAIL_OFFSET,block);
+    }
+
+    public int getBlockSize()
+    {
+        return _blockSize;
+    }
+
+    protected Block<T> getHeadBlock()
+    {
+        return _blocks.get(HEAD_OFFSET);
+    }
+
+    protected Block<T> getTailBlock()
+    {
+        return _blocks.get(TAIL_OFFSET);
+    }
+
+    @Override
+    public boolean offer(T item)
+    {
+        item = Objects.requireNonNull(item);
+
+        final Block<T> initialTailBlock = getTailBlock();
+        Block<T> currentTailBlock = initialTailBlock;
+        int tail = currentTailBlock.tail();
+        while (true)
+        {
+            if (tail == getBlockSize())
+            {
+                Block<T> nextTailBlock = currentTailBlock.next();
+                if (nextTailBlock == null)
+                {
+                    nextTailBlock = newBlock();
+                    if (currentTailBlock.link(nextTailBlock))
+                    {
+                        // Linking succeeded, loop
+                        currentTailBlock = nextTailBlock;
+                    }
+                    else
+                    {
+                        // Concurrent linking, use other block and loop
+                        currentTailBlock = currentTailBlock.next();
+                    }
+                }
+                else
+                {
+                    // Not at last block, loop
+                    currentTailBlock = nextTailBlock;
+                }
+                tail = currentTailBlock.tail();
+            }
+            else
+            {
+                if (currentTailBlock.peek(tail) == null)
+                {
+                    if (currentTailBlock.store(tail, item))
+                    {
+                        // Item stored
+                        break;
+                    }
+                    else
+                    {
+                        // Concurrent store, try next index
+                        ++tail;
+                    }
+                }
+                else
+                {
+                    // Not free, try next index
+                    ++tail;
+                }
+            }
+        }
+
+        updateTailBlock(initialTailBlock, currentTailBlock);
+
+        return true;
+    }
+
+    private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock)
+    {
+        // Update the tail block pointer if needs to
+        if (oldTailBlock != newTailBlock)
+        {
+            // The tail block pointer is allowed to lag behind.
+            // If this update fails, it means that other threads
+            // have filled this block and installed a new one.
+            casTailBlock(oldTailBlock, newTailBlock);
+        }
+    }
+
+    protected boolean casTailBlock(Block<T> current, Block<T> update)
+    {
+        return _blocks.compareAndSet(TAIL_OFFSET,current,update);
+    }
+
+    @Override
+    public T poll()
+    {
+        final Block<T> initialHeadBlock = getHeadBlock();
+        Block<T> currentHeadBlock = initialHeadBlock;
+        int head = currentHeadBlock.head();
+        T result = null;
+        while (true)
+        {
+            if (head == getBlockSize())
+            {
+                Block<T> nextHeadBlock = currentHeadBlock.next();
+                if (nextHeadBlock == null)
+                {
+                    // We could have read that the next head block was null
+                    // but another thread allocated a new block and stored a
+                    // new item. This thread could not detect this, but that
+                    // is ok, otherwise we would not be able to exit this loop.
+
+                    // Queue is empty
+                    break;
+                }
+                else
+                {
+                    // Use next block and loop
+                    currentHeadBlock = nextHeadBlock;
+                    head = currentHeadBlock.head();
+                }
+            }
+            else
+            {
+                Object element = currentHeadBlock.peek(head);
+                if (element == REMOVED_ELEMENT)
+                {
+                    // Already removed, try next index
+                    ++head;
+                }
+                else
+                {
+                    result = (T)element;
+                    if (result != null)
+                    {
+                        if (currentHeadBlock.remove(head, result, true))
+                        {
+                            // Item removed
+                            break;
+                        }
+                        else
+                        {
+                            // Concurrent remove, try next index
+                            ++head;
+                        }
+                    }
+                    else
+                    {
+                        // Queue is empty
+                        break;
+                    }
+                }
+            }
+        }
+
+        updateHeadBlock(initialHeadBlock, currentHeadBlock);
+
+        return result;
+    }
+
+    private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock)
+    {
+        // Update the head block pointer if needs to
+        if (oldHeadBlock != newHeadBlock)
+        {
+            // The head block pointer lagged behind.
+            // If this update fails, it means that other threads
+            // have emptied this block and pointed to a new one.
+            casHeadBlock(oldHeadBlock, newHeadBlock);
+        }
+    }
+
+    protected boolean casHeadBlock(Block<T> current, Block<T> update)
+    {
+        return _blocks.compareAndSet(HEAD_OFFSET,current,update);
+    }
+
+    @Override
+    public T peek()
+    {
+        Block<T> currentHeadBlock = getHeadBlock();
+        int head = currentHeadBlock.head();
+        while (true)
+        {
+            if (head == getBlockSize())
+            {
+                Block<T> nextHeadBlock = currentHeadBlock.next();
+                if (nextHeadBlock == null)
+                {
+                    // Queue is empty
+                    return null;
+                }
+                else
+                {
+                    // Use next block and loop
+                    currentHeadBlock = nextHeadBlock;
+                    head = currentHeadBlock.head();
+                }
+            }
+            else
+            {
+                Object element = currentHeadBlock.peek(head);
+                if (element == REMOVED_ELEMENT)
+                {
+                    // Already removed, try next index
+                    ++head;
+                }
+                else
+                {
+                    return (T)element;
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean remove(Object o)
+    {
+        Block<T> currentHeadBlock = getHeadBlock();
+        int head = currentHeadBlock.head();
+        boolean result = false;
+        while (true)
+        {
+            if (head == getBlockSize())
+            {
+                Block<T> nextHeadBlock = currentHeadBlock.next();
+                if (nextHeadBlock == null)
+                {
+                    // Not found
+                    break;
+                }
+                else
+                {
+                    // Use next block and loop
+                    currentHeadBlock = nextHeadBlock;
+                    head = currentHeadBlock.head();
+                }
+            }
+            else
+            {
+                Object element = currentHeadBlock.peek(head);
+                if (element == REMOVED_ELEMENT)
+                {
+                    // Removed, try next index
+                    ++head;
+                }
+                else
+                {
+                    if (element == null)
+                    {
+                        // Not found
+                        break;
+                    }
+                    else
+                    {
+                        if (element.equals(o))
+                        {
+                            // Found
+                            if (currentHeadBlock.remove(head, o, false))
+                            {
+                                result = true;
+                                break;
+                            }
+                            else
+                            {
+                                ++head;
+                            }
+                        }
+                        else
+                        {
+                            // Not the one we're looking for
+                            ++head;
+                        }
+                    }
+                }
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c)
+    {
+        // TODO: super invocations are based on iterator.remove(), which throws
+        return super.removeAll(c);
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c)
+    {
+        // TODO: super invocations are based on iterator.remove(), which throws
+        return super.retainAll(c);
+    }
+
+    @Override
+    public Iterator<T> iterator()
+    {
+        final List<Object[]> blocks = new ArrayList<>();
+        Block<T> currentHeadBlock = getHeadBlock();
+        while (currentHeadBlock != null)
+        {
+            Object[] elements = currentHeadBlock.arrayCopy();
+            blocks.add(elements);
+            currentHeadBlock = currentHeadBlock.next();
+        }
+        return new Iterator<T>()
+        {
+            private int blockIndex;
+            private int index;
+
+            @Override
+            public boolean hasNext()
+            {
+                while (true)
+                {
+                    if (blockIndex == blocks.size())
+                        return false;
+
+                    Object element = blocks.get(blockIndex)[index];
+
+                    if (element == null)
+                        return false;
+
+                    if (element != REMOVED_ELEMENT)
+                        return true;
+
+                    advance();
+                }
+            }
+
+            @Override
+            public T next()
+            {
+                while (true)
+                {
+                    if (blockIndex == blocks.size())
+                        throw new NoSuchElementException();
+
+                    Object element = blocks.get(blockIndex)[index];
+
+                    if (element == null)
+                        throw new NoSuchElementException();
+
+                    advance();
+
+                    if (element != REMOVED_ELEMENT)
+                        return (T)element;
+                }
+            }
+
+            private void advance()
+            {
+                if (++index == getBlockSize())
+                {
+                    index = 0;
+                    ++blockIndex;
+                }
+            }
+
+            @Override
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    @Override
+    public int size()
+    {
+        Block<T> currentHeadBlock = getHeadBlock();
+        int head = currentHeadBlock.head();
+        int size = 0;
+        while (true)
+        {
+            if (head == getBlockSize())
+            {
+                Block<T> nextHeadBlock = currentHeadBlock.next();
+                if (nextHeadBlock == null)
+                {
+                    break;
+                }
+                else
+                {
+                    // Use next block and loop
+                    currentHeadBlock = nextHeadBlock;
+                    head = currentHeadBlock.head();
+                }
+            }
+            else
+            {
+                Object element = currentHeadBlock.peek(head);
+                if (element == REMOVED_ELEMENT)
+                {
+                    // Already removed, try next index
+                    ++head;
+                }
+                else if (element != null)
+                {
+                    ++size;
+                    ++head;
+                }
+                else
+                {
+                    break;
+                }
+            }
+        }
+        return size;
+    }
+
+    protected Block<T> newBlock()
+    {
+        return new Block<>(getBlockSize());
+    }
+
+    protected int getBlockCount()
+    {
+        int result = 0;
+        Block<T> headBlock = getHeadBlock();
+        while (headBlock != null)
+        {
+            ++result;
+            headBlock = headBlock.next();
+        }
+        return result;
+    }
+
+    protected static final class Block<E>
+    {
+        private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
+        private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
+
+        private final AtomicReferenceArray<Object> elements;
+        private final AtomicReference<Block<E>> next = new AtomicReference<>();
+        private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
+
+        protected Block(int blockSize)
+        {
+            elements = new AtomicReferenceArray<>(blockSize);
+        }
+
+        public Object peek(int index)
+        {
+            return elements.get(index);
+        }
+
+        public boolean store(int index, E item)
+        {
+            boolean result = elements.compareAndSet(index, null, item);
+            if (result)
+                indexes.incrementAndGet(tailOffset);
+            return result;
+        }
+
+        public boolean remove(int index, Object item, boolean updateHead)
+        {
+            boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
+            if (result && updateHead)
+                indexes.incrementAndGet(headOffset);
+            return result;
+        }
+
+        public Block<E> next()
+        {
+            return next.get();
+        }
+
+        public boolean link(Block<E> nextBlock)
+        {
+            return next.compareAndSet(null, nextBlock);
+        }
+
+        public int head()
+        {
+            return indexes.get(headOffset);
+        }
+
+        public int tail()
+        {
+            return indexes.get(tailOffset);
+        }
+
+        public Object[] arrayCopy()
+        {
+            Object[] result = new Object[elements.length()];
+            for (int i = 0; i < result.length; ++i)
+                result[i] = elements.get(i);
+            return result;
+        }
+    }
+}