]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/util/ConcurrentArrayQueue.java
Importing upstream Jetty jetty-9.2.1.v20140609
[gigi.git] / lib / jetty / org / eclipse / jetty / util / ConcurrentArrayQueue.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.util;
20
21 import java.util.AbstractQueue;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.Objects;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.atomic.AtomicIntegerArray;
31 import java.util.concurrent.atomic.AtomicReference;
32 import java.util.concurrent.atomic.AtomicReferenceArray;
33
34 /**
35  * A concurrent, unbounded implementation of {@link Queue} that uses singly-linked array blocks
36  * to store elements.
37  * <p/>
38  * This class is a drop-in replacement for {@link ConcurrentLinkedQueue}, with similar performance
39  * but producing less garbage because arrays are used to store elements rather than nodes.
40  * <p/>
41  * The algorithm used is a variation of the algorithm from Gidenstam, Sundell and Tsigas
42  * (http://www.adm.hb.se/~AGD/Presentations/CacheAwareQueue_OPODIS.pdf).
43  *
44  * @param <T>
45  */
46 public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
47 {
48     public static final int DEFAULT_BLOCK_SIZE = 512;
49     public static final Object REMOVED_ELEMENT = new Object()
50     {
51         @Override
52         public String toString()
53         {
54             return "X";
55         }
56     };
57
58     private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
59     private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
60
61     private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
62     private final int _blockSize;
63
64     public ConcurrentArrayQueue()
65     {
66         this(DEFAULT_BLOCK_SIZE);
67     }
68
69     public ConcurrentArrayQueue(int blockSize)
70     {
71         _blockSize = blockSize;
72         Block<T> block = newBlock();
73         _blocks.set(HEAD_OFFSET,block);
74         _blocks.set(TAIL_OFFSET,block);
75     }
76
77     public int getBlockSize()
78     {
79         return _blockSize;
80     }
81
82     protected Block<T> getHeadBlock()
83     {
84         return _blocks.get(HEAD_OFFSET);
85     }
86
87     protected Block<T> getTailBlock()
88     {
89         return _blocks.get(TAIL_OFFSET);
90     }
91
92     @Override
93     public boolean offer(T item)
94     {
95         item = Objects.requireNonNull(item);
96
97         final Block<T> initialTailBlock = getTailBlock();
98         Block<T> currentTailBlock = initialTailBlock;
99         int tail = currentTailBlock.tail();
100         while (true)
101         {
102             if (tail == getBlockSize())
103             {
104                 Block<T> nextTailBlock = currentTailBlock.next();
105                 if (nextTailBlock == null)
106                 {
107                     nextTailBlock = newBlock();
108                     if (currentTailBlock.link(nextTailBlock))
109                     {
110                         // Linking succeeded, loop
111                         currentTailBlock = nextTailBlock;
112                     }
113                     else
114                     {
115                         // Concurrent linking, use other block and loop
116                         currentTailBlock = currentTailBlock.next();
117                     }
118                 }
119                 else
120                 {
121                     // Not at last block, loop
122                     currentTailBlock = nextTailBlock;
123                 }
124                 tail = currentTailBlock.tail();
125             }
126             else
127             {
128                 if (currentTailBlock.peek(tail) == null)
129                 {
130                     if (currentTailBlock.store(tail, item))
131                     {
132                         // Item stored
133                         break;
134                     }
135                     else
136                     {
137                         // Concurrent store, try next index
138                         ++tail;
139                     }
140                 }
141                 else
142                 {
143                     // Not free, try next index
144                     ++tail;
145                 }
146             }
147         }
148
149         updateTailBlock(initialTailBlock, currentTailBlock);
150
151         return true;
152     }
153
154     private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock)
155     {
156         // Update the tail block pointer if needs to
157         if (oldTailBlock != newTailBlock)
158         {
159             // The tail block pointer is allowed to lag behind.
160             // If this update fails, it means that other threads
161             // have filled this block and installed a new one.
162             casTailBlock(oldTailBlock, newTailBlock);
163         }
164     }
165
166     protected boolean casTailBlock(Block<T> current, Block<T> update)
167     {
168         return _blocks.compareAndSet(TAIL_OFFSET,current,update);
169     }
170
171     @Override
172     public T poll()
173     {
174         final Block<T> initialHeadBlock = getHeadBlock();
175         Block<T> currentHeadBlock = initialHeadBlock;
176         int head = currentHeadBlock.head();
177         T result = null;
178         while (true)
179         {
180             if (head == getBlockSize())
181             {
182                 Block<T> nextHeadBlock = currentHeadBlock.next();
183                 if (nextHeadBlock == null)
184                 {
185                     // We could have read that the next head block was null
186                     // but another thread allocated a new block and stored a
187                     // new item. This thread could not detect this, but that
188                     // is ok, otherwise we would not be able to exit this loop.
189
190                     // Queue is empty
191                     break;
192                 }
193                 else
194                 {
195                     // Use next block and loop
196                     currentHeadBlock = nextHeadBlock;
197                     head = currentHeadBlock.head();
198                 }
199             }
200             else
201             {
202                 Object element = currentHeadBlock.peek(head);
203                 if (element == REMOVED_ELEMENT)
204                 {
205                     // Already removed, try next index
206                     ++head;
207                 }
208                 else
209                 {
210                     result = (T)element;
211                     if (result != null)
212                     {
213                         if (currentHeadBlock.remove(head, result, true))
214                         {
215                             // Item removed
216                             break;
217                         }
218                         else
219                         {
220                             // Concurrent remove, try next index
221                             ++head;
222                         }
223                     }
224                     else
225                     {
226                         // Queue is empty
227                         break;
228                     }
229                 }
230             }
231         }
232
233         updateHeadBlock(initialHeadBlock, currentHeadBlock);
234
235         return result;
236     }
237
238     private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock)
239     {
240         // Update the head block pointer if needs to
241         if (oldHeadBlock != newHeadBlock)
242         {
243             // The head block pointer lagged behind.
244             // If this update fails, it means that other threads
245             // have emptied this block and pointed to a new one.
246             casHeadBlock(oldHeadBlock, newHeadBlock);
247         }
248     }
249
250     protected boolean casHeadBlock(Block<T> current, Block<T> update)
251     {
252         return _blocks.compareAndSet(HEAD_OFFSET,current,update);
253     }
254
255     @Override
256     public T peek()
257     {
258         Block<T> currentHeadBlock = getHeadBlock();
259         int head = currentHeadBlock.head();
260         while (true)
261         {
262             if (head == getBlockSize())
263             {
264                 Block<T> nextHeadBlock = currentHeadBlock.next();
265                 if (nextHeadBlock == null)
266                 {
267                     // Queue is empty
268                     return null;
269                 }
270                 else
271                 {
272                     // Use next block and loop
273                     currentHeadBlock = nextHeadBlock;
274                     head = currentHeadBlock.head();
275                 }
276             }
277             else
278             {
279                 Object element = currentHeadBlock.peek(head);
280                 if (element == REMOVED_ELEMENT)
281                 {
282                     // Already removed, try next index
283                     ++head;
284                 }
285                 else
286                 {
287                     return (T)element;
288                 }
289             }
290         }
291     }
292
293     @Override
294     public boolean remove(Object o)
295     {
296         Block<T> currentHeadBlock = getHeadBlock();
297         int head = currentHeadBlock.head();
298         boolean result = false;
299         while (true)
300         {
301             if (head == getBlockSize())
302             {
303                 Block<T> nextHeadBlock = currentHeadBlock.next();
304                 if (nextHeadBlock == null)
305                 {
306                     // Not found
307                     break;
308                 }
309                 else
310                 {
311                     // Use next block and loop
312                     currentHeadBlock = nextHeadBlock;
313                     head = currentHeadBlock.head();
314                 }
315             }
316             else
317             {
318                 Object element = currentHeadBlock.peek(head);
319                 if (element == REMOVED_ELEMENT)
320                 {
321                     // Removed, try next index
322                     ++head;
323                 }
324                 else
325                 {
326                     if (element == null)
327                     {
328                         // Not found
329                         break;
330                     }
331                     else
332                     {
333                         if (element.equals(o))
334                         {
335                             // Found
336                             if (currentHeadBlock.remove(head, o, false))
337                             {
338                                 result = true;
339                                 break;
340                             }
341                             else
342                             {
343                                 ++head;
344                             }
345                         }
346                         else
347                         {
348                             // Not the one we're looking for
349                             ++head;
350                         }
351                     }
352                 }
353             }
354         }
355
356         return result;
357     }
358
359     @Override
360     public boolean removeAll(Collection<?> c)
361     {
362         // TODO: super invocations are based on iterator.remove(), which throws
363         return super.removeAll(c);
364     }
365
366     @Override
367     public boolean retainAll(Collection<?> c)
368     {
369         // TODO: super invocations are based on iterator.remove(), which throws
370         return super.retainAll(c);
371     }
372
373     @Override
374     public Iterator<T> iterator()
375     {
376         final List<Object[]> blocks = new ArrayList<>();
377         Block<T> currentHeadBlock = getHeadBlock();
378         while (currentHeadBlock != null)
379         {
380             Object[] elements = currentHeadBlock.arrayCopy();
381             blocks.add(elements);
382             currentHeadBlock = currentHeadBlock.next();
383         }
384         return new Iterator<T>()
385         {
386             private int blockIndex;
387             private int index;
388
389             @Override
390             public boolean hasNext()
391             {
392                 while (true)
393                 {
394                     if (blockIndex == blocks.size())
395                         return false;
396
397                     Object element = blocks.get(blockIndex)[index];
398
399                     if (element == null)
400                         return false;
401
402                     if (element != REMOVED_ELEMENT)
403                         return true;
404
405                     advance();
406                 }
407             }
408
409             @Override
410             public T next()
411             {
412                 while (true)
413                 {
414                     if (blockIndex == blocks.size())
415                         throw new NoSuchElementException();
416
417                     Object element = blocks.get(blockIndex)[index];
418
419                     if (element == null)
420                         throw new NoSuchElementException();
421
422                     advance();
423
424                     if (element != REMOVED_ELEMENT)
425                         return (T)element;
426                 }
427             }
428
429             private void advance()
430             {
431                 if (++index == getBlockSize())
432                 {
433                     index = 0;
434                     ++blockIndex;
435                 }
436             }
437
438             @Override
439             public void remove()
440             {
441                 throw new UnsupportedOperationException();
442             }
443         };
444     }
445
446     @Override
447     public int size()
448     {
449         Block<T> currentHeadBlock = getHeadBlock();
450         int head = currentHeadBlock.head();
451         int size = 0;
452         while (true)
453         {
454             if (head == getBlockSize())
455             {
456                 Block<T> nextHeadBlock = currentHeadBlock.next();
457                 if (nextHeadBlock == null)
458                 {
459                     break;
460                 }
461                 else
462                 {
463                     // Use next block and loop
464                     currentHeadBlock = nextHeadBlock;
465                     head = currentHeadBlock.head();
466                 }
467             }
468             else
469             {
470                 Object element = currentHeadBlock.peek(head);
471                 if (element == REMOVED_ELEMENT)
472                 {
473                     // Already removed, try next index
474                     ++head;
475                 }
476                 else if (element != null)
477                 {
478                     ++size;
479                     ++head;
480                 }
481                 else
482                 {
483                     break;
484                 }
485             }
486         }
487         return size;
488     }
489
490     protected Block<T> newBlock()
491     {
492         return new Block<>(getBlockSize());
493     }
494
495     protected int getBlockCount()
496     {
497         int result = 0;
498         Block<T> headBlock = getHeadBlock();
499         while (headBlock != null)
500         {
501             ++result;
502             headBlock = headBlock.next();
503         }
504         return result;
505     }
506
507     protected static final class Block<E>
508     {
509         private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
510         private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
511
512         private final AtomicReferenceArray<Object> elements;
513         private final AtomicReference<Block<E>> next = new AtomicReference<>();
514         private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
515
516         protected Block(int blockSize)
517         {
518             elements = new AtomicReferenceArray<>(blockSize);
519         }
520
521         public Object peek(int index)
522         {
523             return elements.get(index);
524         }
525
526         public boolean store(int index, E item)
527         {
528             boolean result = elements.compareAndSet(index, null, item);
529             if (result)
530                 indexes.incrementAndGet(tailOffset);
531             return result;
532         }
533
534         public boolean remove(int index, Object item, boolean updateHead)
535         {
536             boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
537             if (result && updateHead)
538                 indexes.incrementAndGet(headOffset);
539             return result;
540         }
541
542         public Block<E> next()
543         {
544             return next.get();
545         }
546
547         public boolean link(Block<E> nextBlock)
548         {
549             return next.compareAndSet(null, nextBlock);
550         }
551
552         public int head()
553         {
554             return indexes.get(headOffset);
555         }
556
557         public int tail()
558         {
559             return indexes.get(tailOffset);
560         }
561
562         public Object[] arrayCopy()
563         {
564             Object[] result = new Object[elements.length()];
565             for (int i = 0; i < result.length; ++i)
566                 result[i] = elements.get(i);
567             return result;
568         }
569     }
570 }