2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
19 package org.eclipse.jetty.util;
21 import java.util.AbstractQueue;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.Objects;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.atomic.AtomicIntegerArray;
31 import java.util.concurrent.atomic.AtomicReference;
32 import java.util.concurrent.atomic.AtomicReferenceArray;
35 * A concurrent, unbounded implementation of {@link Queue} that uses singly-linked array blocks
38 * This class is a drop-in replacement for {@link ConcurrentLinkedQueue}, with similar performance
39 * but producing less garbage because arrays are used to store elements rather than nodes.
41 * The algorithm used is a variation of the algorithm from Gidenstam, Sundell and Tsigas
42 * (http://www.adm.hb.se/~AGD/Presentations/CacheAwareQueue_OPODIS.pdf).
46 public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
48 public static final int DEFAULT_BLOCK_SIZE = 512;
49 public static final Object REMOVED_ELEMENT = new Object()
52 public String toString()
58 private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
59 private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
61 private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
62 private final int _blockSize;
64 public ConcurrentArrayQueue()
66 this(DEFAULT_BLOCK_SIZE);
69 public ConcurrentArrayQueue(int blockSize)
71 _blockSize = blockSize;
72 Block<T> block = newBlock();
73 _blocks.set(HEAD_OFFSET,block);
74 _blocks.set(TAIL_OFFSET,block);
77 public int getBlockSize()
82 protected Block<T> getHeadBlock()
84 return _blocks.get(HEAD_OFFSET);
87 protected Block<T> getTailBlock()
89 return _blocks.get(TAIL_OFFSET);
93 public boolean offer(T item)
95 item = Objects.requireNonNull(item);
97 final Block<T> initialTailBlock = getTailBlock();
98 Block<T> currentTailBlock = initialTailBlock;
99 int tail = currentTailBlock.tail();
102 if (tail == getBlockSize())
104 Block<T> nextTailBlock = currentTailBlock.next();
105 if (nextTailBlock == null)
107 nextTailBlock = newBlock();
108 if (currentTailBlock.link(nextTailBlock))
110 // Linking succeeded, loop
111 currentTailBlock = nextTailBlock;
115 // Concurrent linking, use other block and loop
116 currentTailBlock = currentTailBlock.next();
121 // Not at last block, loop
122 currentTailBlock = nextTailBlock;
124 tail = currentTailBlock.tail();
128 if (currentTailBlock.peek(tail) == null)
130 if (currentTailBlock.store(tail, item))
137 // Concurrent store, try next index
143 // Not free, try next index
149 updateTailBlock(initialTailBlock, currentTailBlock);
154 private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock)
156 // Update the tail block pointer if needs to
157 if (oldTailBlock != newTailBlock)
159 // The tail block pointer is allowed to lag behind.
160 // If this update fails, it means that other threads
161 // have filled this block and installed a new one.
162 casTailBlock(oldTailBlock, newTailBlock);
166 protected boolean casTailBlock(Block<T> current, Block<T> update)
168 return _blocks.compareAndSet(TAIL_OFFSET,current,update);
174 final Block<T> initialHeadBlock = getHeadBlock();
175 Block<T> currentHeadBlock = initialHeadBlock;
176 int head = currentHeadBlock.head();
180 if (head == getBlockSize())
182 Block<T> nextHeadBlock = currentHeadBlock.next();
183 if (nextHeadBlock == null)
185 // We could have read that the next head block was null
186 // but another thread allocated a new block and stored a
187 // new item. This thread could not detect this, but that
188 // is ok, otherwise we would not be able to exit this loop.
195 // Use next block and loop
196 currentHeadBlock = nextHeadBlock;
197 head = currentHeadBlock.head();
202 Object element = currentHeadBlock.peek(head);
203 if (element == REMOVED_ELEMENT)
205 // Already removed, try next index
213 if (currentHeadBlock.remove(head, result, true))
220 // Concurrent remove, try next index
233 updateHeadBlock(initialHeadBlock, currentHeadBlock);
238 private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock)
240 // Update the head block pointer if needs to
241 if (oldHeadBlock != newHeadBlock)
243 // The head block pointer lagged behind.
244 // If this update fails, it means that other threads
245 // have emptied this block and pointed to a new one.
246 casHeadBlock(oldHeadBlock, newHeadBlock);
250 protected boolean casHeadBlock(Block<T> current, Block<T> update)
252 return _blocks.compareAndSet(HEAD_OFFSET,current,update);
258 Block<T> currentHeadBlock = getHeadBlock();
259 int head = currentHeadBlock.head();
262 if (head == getBlockSize())
264 Block<T> nextHeadBlock = currentHeadBlock.next();
265 if (nextHeadBlock == null)
272 // Use next block and loop
273 currentHeadBlock = nextHeadBlock;
274 head = currentHeadBlock.head();
279 Object element = currentHeadBlock.peek(head);
280 if (element == REMOVED_ELEMENT)
282 // Already removed, try next index
294 public boolean remove(Object o)
296 Block<T> currentHeadBlock = getHeadBlock();
297 int head = currentHeadBlock.head();
298 boolean result = false;
301 if (head == getBlockSize())
303 Block<T> nextHeadBlock = currentHeadBlock.next();
304 if (nextHeadBlock == null)
311 // Use next block and loop
312 currentHeadBlock = nextHeadBlock;
313 head = currentHeadBlock.head();
318 Object element = currentHeadBlock.peek(head);
319 if (element == REMOVED_ELEMENT)
321 // Removed, try next index
333 if (element.equals(o))
336 if (currentHeadBlock.remove(head, o, false))
348 // Not the one we're looking for
360 public boolean removeAll(Collection<?> c)
362 // TODO: super invocations are based on iterator.remove(), which throws
363 return super.removeAll(c);
367 public boolean retainAll(Collection<?> c)
369 // TODO: super invocations are based on iterator.remove(), which throws
370 return super.retainAll(c);
374 public Iterator<T> iterator()
376 final List<Object[]> blocks = new ArrayList<>();
377 Block<T> currentHeadBlock = getHeadBlock();
378 while (currentHeadBlock != null)
380 Object[] elements = currentHeadBlock.arrayCopy();
381 blocks.add(elements);
382 currentHeadBlock = currentHeadBlock.next();
384 return new Iterator<T>()
386 private int blockIndex;
390 public boolean hasNext()
394 if (blockIndex == blocks.size())
397 Object element = blocks.get(blockIndex)[index];
402 if (element != REMOVED_ELEMENT)
414 if (blockIndex == blocks.size())
415 throw new NoSuchElementException();
417 Object element = blocks.get(blockIndex)[index];
420 throw new NoSuchElementException();
424 if (element != REMOVED_ELEMENT)
429 private void advance()
431 if (++index == getBlockSize())
441 throw new UnsupportedOperationException();
449 Block<T> currentHeadBlock = getHeadBlock();
450 int head = currentHeadBlock.head();
454 if (head == getBlockSize())
456 Block<T> nextHeadBlock = currentHeadBlock.next();
457 if (nextHeadBlock == null)
463 // Use next block and loop
464 currentHeadBlock = nextHeadBlock;
465 head = currentHeadBlock.head();
470 Object element = currentHeadBlock.peek(head);
471 if (element == REMOVED_ELEMENT)
473 // Already removed, try next index
476 else if (element != null)
490 protected Block<T> newBlock()
492 return new Block<>(getBlockSize());
495 protected int getBlockCount()
498 Block<T> headBlock = getHeadBlock();
499 while (headBlock != null)
502 headBlock = headBlock.next();
507 protected static final class Block<E>
509 private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
510 private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
512 private final AtomicReferenceArray<Object> elements;
513 private final AtomicReference<Block<E>> next = new AtomicReference<>();
514 private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
516 protected Block(int blockSize)
518 elements = new AtomicReferenceArray<>(blockSize);
521 public Object peek(int index)
523 return elements.get(index);
526 public boolean store(int index, E item)
528 boolean result = elements.compareAndSet(index, null, item);
530 indexes.incrementAndGet(tailOffset);
534 public boolean remove(int index, Object item, boolean updateHead)
536 boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
537 if (result && updateHead)
538 indexes.incrementAndGet(headOffset);
542 public Block<E> next()
547 public boolean link(Block<E> nextBlock)
549 return next.compareAndSet(null, nextBlock);
554 return indexes.get(headOffset);
559 return indexes.get(tailOffset);
562 public Object[] arrayCopy()
564 Object[] result = new Object[elements.length()];
565 for (int i = 0; i < result.length; ++i)
566 result[i] = elements.get(i);