2 // ========================================================================
3 // Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.util;
21 import java.util.AbstractQueue;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.Objects;
28 import java.util.concurrent.atomic.AtomicIntegerArray;
29 import java.util.concurrent.atomic.AtomicReference;
30 import java.util.concurrent.atomic.AtomicReferenceArray;
33 * A concurrent, unbounded implementation of {@link Queue} that uses singly-linked array blocks
36 * This class is a drop-in replacement for {@link ConcurrentLinkedQueue}, with similar performance
37 * but producing less garbage because arrays are used to store elements rather than nodes.
39 * The algorithm used is a variation of the algorithm from Gidenstam, Sundell and Tsigas
40 * (http://www.adm.hb.se/~AGD/Presentations/CacheAwareQueue_OPODIS.pdf).
44 public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
46 public static final int DEFAULT_BLOCK_SIZE = 512;
47 public static final Object REMOVED_ELEMENT = new Object()
50 public String toString()
56 private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
57 private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
59 private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
60 private final int _blockSize;
62 public ConcurrentArrayQueue()
64 this(DEFAULT_BLOCK_SIZE);
67 public ConcurrentArrayQueue(int blockSize)
69 _blockSize = blockSize;
70 Block<T> block = newBlock();
71 _blocks.set(HEAD_OFFSET,block);
72 _blocks.set(TAIL_OFFSET,block);
75 public int getBlockSize()
80 protected Block<T> getHeadBlock()
82 return _blocks.get(HEAD_OFFSET);
85 protected Block<T> getTailBlock()
87 return _blocks.get(TAIL_OFFSET);
91 public boolean offer(T item)
93 item = Objects.requireNonNull(item);
95 final Block<T> initialTailBlock = getTailBlock();
96 Block<T> currentTailBlock = initialTailBlock;
97 int tail = currentTailBlock.tail();
100 if (tail == getBlockSize())
102 Block<T> nextTailBlock = currentTailBlock.next();
103 if (nextTailBlock == null)
105 nextTailBlock = newBlock();
106 if (currentTailBlock.link(nextTailBlock))
108 // Linking succeeded, loop
109 currentTailBlock = nextTailBlock;
113 // Concurrent linking, use other block and loop
114 currentTailBlock = currentTailBlock.next();
119 // Not at last block, loop
120 currentTailBlock = nextTailBlock;
122 tail = currentTailBlock.tail();
126 if (currentTailBlock.peek(tail) == null)
128 if (currentTailBlock.store(tail, item))
135 // Concurrent store, try next index
141 // Not free, try next index
147 updateTailBlock(initialTailBlock, currentTailBlock);
152 private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock)
154 // Update the tail block pointer if needs to
155 if (oldTailBlock != newTailBlock)
157 // The tail block pointer is allowed to lag behind.
158 // If this update fails, it means that other threads
159 // have filled this block and installed a new one.
160 casTailBlock(oldTailBlock, newTailBlock);
164 protected boolean casTailBlock(Block<T> current, Block<T> update)
166 return _blocks.compareAndSet(TAIL_OFFSET,current,update);
169 @SuppressWarnings("unchecked")
173 final Block<T> initialHeadBlock = getHeadBlock();
174 Block<T> currentHeadBlock = initialHeadBlock;
175 int head = currentHeadBlock.head();
179 if (head == getBlockSize())
181 Block<T> nextHeadBlock = currentHeadBlock.next();
182 if (nextHeadBlock == null)
184 // We could have read that the next head block was null
185 // but another thread allocated a new block and stored a
186 // new item. This thread could not detect this, but that
187 // is ok, otherwise we would not be able to exit this loop.
194 // Use next block and loop
195 currentHeadBlock = nextHeadBlock;
196 head = currentHeadBlock.head();
201 Object element = currentHeadBlock.peek(head);
202 if (element == REMOVED_ELEMENT)
204 // Already removed, try next index
212 if (currentHeadBlock.remove(head, result, true))
219 // Concurrent remove, try next index
232 updateHeadBlock(initialHeadBlock, currentHeadBlock);
237 private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock)
239 // Update the head block pointer if needs to
240 if (oldHeadBlock != newHeadBlock)
242 // The head block pointer lagged behind.
243 // If this update fails, it means that other threads
244 // have emptied this block and pointed to a new one.
245 casHeadBlock(oldHeadBlock, newHeadBlock);
249 protected boolean casHeadBlock(Block<T> current, Block<T> update)
251 return _blocks.compareAndSet(HEAD_OFFSET,current,update);
257 Block<T> currentHeadBlock = getHeadBlock();
258 int head = currentHeadBlock.head();
261 if (head == getBlockSize())
263 Block<T> nextHeadBlock = currentHeadBlock.next();
264 if (nextHeadBlock == null)
271 // Use next block and loop
272 currentHeadBlock = nextHeadBlock;
273 head = currentHeadBlock.head();
278 T element = currentHeadBlock.peek(head);
279 if (element == REMOVED_ELEMENT)
281 // Already removed, try next index
293 public boolean remove(Object o)
295 Block<T> currentHeadBlock = getHeadBlock();
296 int head = currentHeadBlock.head();
297 boolean result = false;
300 if (head == getBlockSize())
302 Block<T> nextHeadBlock = currentHeadBlock.next();
303 if (nextHeadBlock == null)
310 // Use next block and loop
311 currentHeadBlock = nextHeadBlock;
312 head = currentHeadBlock.head();
317 Object element = currentHeadBlock.peek(head);
318 if (element == REMOVED_ELEMENT)
320 // Removed, try next index
332 if (element.equals(o))
335 if (currentHeadBlock.remove(head, o, false))
347 // Not the one we're looking for
359 public boolean removeAll(Collection<?> c)
361 // TODO: super invocations are based on iterator.remove(), which throws
362 return super.removeAll(c);
366 public boolean retainAll(Collection<?> c)
368 // TODO: super invocations are based on iterator.remove(), which throws
369 return super.retainAll(c);
373 public Iterator<T> iterator()
375 final List<Object[]> blocks = new ArrayList<>();
376 Block<T> currentHeadBlock = getHeadBlock();
377 while (currentHeadBlock != null)
379 Object[] elements = currentHeadBlock.arrayCopy();
380 blocks.add(elements);
381 currentHeadBlock = currentHeadBlock.next();
383 return new Iterator<T>()
385 private int blockIndex;
389 public boolean hasNext()
393 if (blockIndex == blocks.size())
396 Object element = blocks.get(blockIndex)[index];
401 if (element != REMOVED_ELEMENT)
413 if (blockIndex == blocks.size())
414 throw new NoSuchElementException();
416 Object element = blocks.get(blockIndex)[index];
419 throw new NoSuchElementException();
423 if (element != REMOVED_ELEMENT) {
424 @SuppressWarnings("unchecked")
431 private void advance()
433 if (++index == getBlockSize())
443 throw new UnsupportedOperationException();
451 Block<T> currentHeadBlock = getHeadBlock();
452 int head = currentHeadBlock.head();
456 if (head == getBlockSize())
458 Block<T> nextHeadBlock = currentHeadBlock.next();
459 if (nextHeadBlock == null)
465 // Use next block and loop
466 currentHeadBlock = nextHeadBlock;
467 head = currentHeadBlock.head();
472 Object element = currentHeadBlock.peek(head);
473 if (element == REMOVED_ELEMENT)
475 // Already removed, try next index
478 else if (element != null)
492 protected Block<T> newBlock()
494 return new Block<>(getBlockSize());
497 protected int getBlockCount()
500 Block<T> headBlock = getHeadBlock();
501 while (headBlock != null)
504 headBlock = headBlock.next();
509 protected static final class Block<E>
511 private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
512 private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
514 private final AtomicReferenceArray<Object> elements;
515 private final AtomicReference<Block<E>> next = new AtomicReference<>();
516 private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
518 protected Block(int blockSize)
520 elements = new AtomicReferenceArray<>(blockSize);
523 @SuppressWarnings("unchecked")
524 public E peek(int index)
526 return (E)elements.get(index);
529 public boolean store(int index, E item)
531 boolean result = elements.compareAndSet(index, null, item);
533 indexes.incrementAndGet(tailOffset);
537 public boolean remove(int index, Object item, boolean updateHead)
539 boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
540 if (result && updateHead)
541 indexes.incrementAndGet(headOffset);
545 public Block<E> next()
550 public boolean link(Block<E> nextBlock)
552 return next.compareAndSet(null, nextBlock);
557 return indexes.get(headOffset);
562 return indexes.get(tailOffset);
565 public Object[] arrayCopy()
567 Object[] result = new Object[elements.length()];
568 for (int i = 0; i < result.length; ++i)
569 result[i] = elements.get(i);