]> WPIA git - gigi.git/blob - lib/jetty/org/eclipse/jetty/util/ConcurrentArrayQueue.java
Merge "Update notes about password security"
[gigi.git] / lib / jetty / org / eclipse / jetty / util / ConcurrentArrayQueue.java
1 //
2 //  ========================================================================
3 //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4 //  ------------------------------------------------------------------------
5 //  All rights reserved. This program and the accompanying materials
6 //  are made available under the terms of the Eclipse Public License v1.0
7 //  and Apache License v2.0 which accompanies this distribution.
8 //
9 //      The Eclipse Public License is available at
10 //      http://www.eclipse.org/legal/epl-v10.html
11 //
12 //      The Apache License v2.0 is available at
13 //      http://www.opensource.org/licenses/apache2.0.php
14 //
15 //  You may elect to redistribute this code under either of these licenses.
16 //  ========================================================================
17 //
18
19 package org.eclipse.jetty.util;
20
21 import java.util.AbstractQueue;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.Objects;
28 import java.util.concurrent.atomic.AtomicIntegerArray;
29 import java.util.concurrent.atomic.AtomicReference;
30 import java.util.concurrent.atomic.AtomicReferenceArray;
31
32 /**
33  * A concurrent, unbounded implementation of {@link Queue} that uses singly-linked array blocks
34  * to store elements.
35  * <p/>
36  * This class is a drop-in replacement for {@link ConcurrentLinkedQueue}, with similar performance
37  * but producing less garbage because arrays are used to store elements rather than nodes.
38  * <p/>
39  * The algorithm used is a variation of the algorithm from Gidenstam, Sundell and Tsigas
40  * (http://www.adm.hb.se/~AGD/Presentations/CacheAwareQueue_OPODIS.pdf).
41  *
42  * @param <T>
43  */
44 public class ConcurrentArrayQueue<T> extends AbstractQueue<T>
45 {
46     public static final int DEFAULT_BLOCK_SIZE = 512;
47     public static final Object REMOVED_ELEMENT = new Object()
48     {
49         @Override
50         public String toString()
51         {
52             return "X";
53         }
54     };
55
56     private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
57     private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1;
58
59     private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1);
60     private final int _blockSize;
61
62     public ConcurrentArrayQueue()
63     {
64         this(DEFAULT_BLOCK_SIZE);
65     }
66
67     public ConcurrentArrayQueue(int blockSize)
68     {
69         _blockSize = blockSize;
70         Block<T> block = newBlock();
71         _blocks.set(HEAD_OFFSET,block);
72         _blocks.set(TAIL_OFFSET,block);
73     }
74
75     public int getBlockSize()
76     {
77         return _blockSize;
78     }
79
80     protected Block<T> getHeadBlock()
81     {
82         return _blocks.get(HEAD_OFFSET);
83     }
84
85     protected Block<T> getTailBlock()
86     {
87         return _blocks.get(TAIL_OFFSET);
88     }
89
90     @Override
91     public boolean offer(T item)
92     {
93         item = Objects.requireNonNull(item);
94
95         final Block<T> initialTailBlock = getTailBlock();
96         Block<T> currentTailBlock = initialTailBlock;
97         int tail = currentTailBlock.tail();
98         while (true)
99         {
100             if (tail == getBlockSize())
101             {
102                 Block<T> nextTailBlock = currentTailBlock.next();
103                 if (nextTailBlock == null)
104                 {
105                     nextTailBlock = newBlock();
106                     if (currentTailBlock.link(nextTailBlock))
107                     {
108                         // Linking succeeded, loop
109                         currentTailBlock = nextTailBlock;
110                     }
111                     else
112                     {
113                         // Concurrent linking, use other block and loop
114                         currentTailBlock = currentTailBlock.next();
115                     }
116                 }
117                 else
118                 {
119                     // Not at last block, loop
120                     currentTailBlock = nextTailBlock;
121                 }
122                 tail = currentTailBlock.tail();
123             }
124             else
125             {
126                 if (currentTailBlock.peek(tail) == null)
127                 {
128                     if (currentTailBlock.store(tail, item))
129                     {
130                         // Item stored
131                         break;
132                     }
133                     else
134                     {
135                         // Concurrent store, try next index
136                         ++tail;
137                     }
138                 }
139                 else
140                 {
141                     // Not free, try next index
142                     ++tail;
143                 }
144             }
145         }
146
147         updateTailBlock(initialTailBlock, currentTailBlock);
148
149         return true;
150     }
151
152     private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock)
153     {
154         // Update the tail block pointer if needs to
155         if (oldTailBlock != newTailBlock)
156         {
157             // The tail block pointer is allowed to lag behind.
158             // If this update fails, it means that other threads
159             // have filled this block and installed a new one.
160             casTailBlock(oldTailBlock, newTailBlock);
161         }
162     }
163
164     protected boolean casTailBlock(Block<T> current, Block<T> update)
165     {
166         return _blocks.compareAndSet(TAIL_OFFSET,current,update);
167     }
168
169     @SuppressWarnings("unchecked")
170     @Override
171     public T poll()
172     {
173         final Block<T> initialHeadBlock = getHeadBlock();
174         Block<T> currentHeadBlock = initialHeadBlock;
175         int head = currentHeadBlock.head();
176         T result = null;
177         while (true)
178         {
179             if (head == getBlockSize())
180             {
181                 Block<T> nextHeadBlock = currentHeadBlock.next();
182                 if (nextHeadBlock == null)
183                 {
184                     // We could have read that the next head block was null
185                     // but another thread allocated a new block and stored a
186                     // new item. This thread could not detect this, but that
187                     // is ok, otherwise we would not be able to exit this loop.
188
189                     // Queue is empty
190                     break;
191                 }
192                 else
193                 {
194                     // Use next block and loop
195                     currentHeadBlock = nextHeadBlock;
196                     head = currentHeadBlock.head();
197                 }
198             }
199             else
200             {
201                 Object element = currentHeadBlock.peek(head);
202                 if (element == REMOVED_ELEMENT)
203                 {
204                     // Already removed, try next index
205                     ++head;
206                 }
207                 else
208                 {
209                     result = (T)element;
210                     if (result != null)
211                     {
212                         if (currentHeadBlock.remove(head, result, true))
213                         {
214                             // Item removed
215                             break;
216                         }
217                         else
218                         {
219                             // Concurrent remove, try next index
220                             ++head;
221                         }
222                     }
223                     else
224                     {
225                         // Queue is empty
226                         break;
227                     }
228                 }
229             }
230         }
231
232         updateHeadBlock(initialHeadBlock, currentHeadBlock);
233
234         return result;
235     }
236
237     private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock)
238     {
239         // Update the head block pointer if needs to
240         if (oldHeadBlock != newHeadBlock)
241         {
242             // The head block pointer lagged behind.
243             // If this update fails, it means that other threads
244             // have emptied this block and pointed to a new one.
245             casHeadBlock(oldHeadBlock, newHeadBlock);
246         }
247     }
248
249     protected boolean casHeadBlock(Block<T> current, Block<T> update)
250     {
251         return _blocks.compareAndSet(HEAD_OFFSET,current,update);
252     }
253
254     @Override
255     public T peek()
256     {
257         Block<T> currentHeadBlock = getHeadBlock();
258         int head = currentHeadBlock.head();
259         while (true)
260         {
261             if (head == getBlockSize())
262             {
263                 Block<T> nextHeadBlock = currentHeadBlock.next();
264                 if (nextHeadBlock == null)
265                 {
266                     // Queue is empty
267                     return null;
268                 }
269                 else
270                 {
271                     // Use next block and loop
272                     currentHeadBlock = nextHeadBlock;
273                     head = currentHeadBlock.head();
274                 }
275             }
276             else
277             {
278                 T element = currentHeadBlock.peek(head);
279                 if (element == REMOVED_ELEMENT)
280                 {
281                     // Already removed, try next index
282                     ++head;
283                 }
284                 else
285                 {
286                     return element;
287                 }
288             }
289         }
290     }
291
292     @Override
293     public boolean remove(Object o)
294     {
295         Block<T> currentHeadBlock = getHeadBlock();
296         int head = currentHeadBlock.head();
297         boolean result = false;
298         while (true)
299         {
300             if (head == getBlockSize())
301             {
302                 Block<T> nextHeadBlock = currentHeadBlock.next();
303                 if (nextHeadBlock == null)
304                 {
305                     // Not found
306                     break;
307                 }
308                 else
309                 {
310                     // Use next block and loop
311                     currentHeadBlock = nextHeadBlock;
312                     head = currentHeadBlock.head();
313                 }
314             }
315             else
316             {
317                 Object element = currentHeadBlock.peek(head);
318                 if (element == REMOVED_ELEMENT)
319                 {
320                     // Removed, try next index
321                     ++head;
322                 }
323                 else
324                 {
325                     if (element == null)
326                     {
327                         // Not found
328                         break;
329                     }
330                     else
331                     {
332                         if (element.equals(o))
333                         {
334                             // Found
335                             if (currentHeadBlock.remove(head, o, false))
336                             {
337                                 result = true;
338                                 break;
339                             }
340                             else
341                             {
342                                 ++head;
343                             }
344                         }
345                         else
346                         {
347                             // Not the one we're looking for
348                             ++head;
349                         }
350                     }
351                 }
352             }
353         }
354
355         return result;
356     }
357
358     @Override
359     public boolean removeAll(Collection<?> c)
360     {
361         // TODO: super invocations are based on iterator.remove(), which throws
362         return super.removeAll(c);
363     }
364
365     @Override
366     public boolean retainAll(Collection<?> c)
367     {
368         // TODO: super invocations are based on iterator.remove(), which throws
369         return super.retainAll(c);
370     }
371
372     @Override
373     public Iterator<T> iterator()
374     {
375         final List<Object[]> blocks = new ArrayList<>();
376         Block<T> currentHeadBlock = getHeadBlock();
377         while (currentHeadBlock != null)
378         {
379             Object[] elements = currentHeadBlock.arrayCopy();
380             blocks.add(elements);
381             currentHeadBlock = currentHeadBlock.next();
382         }
383         return new Iterator<T>()
384         {
385             private int blockIndex;
386             private int index;
387
388             @Override
389             public boolean hasNext()
390             {
391                 while (true)
392                 {
393                     if (blockIndex == blocks.size())
394                         return false;
395
396                     Object element = blocks.get(blockIndex)[index];
397
398                     if (element == null)
399                         return false;
400
401                     if (element != REMOVED_ELEMENT)
402                         return true;
403
404                     advance();
405                 }
406             }
407
408             @Override
409             public T next()
410             {
411                 while (true)
412                 {
413                     if (blockIndex == blocks.size())
414                         throw new NoSuchElementException();
415
416                     Object element = blocks.get(blockIndex)[index];
417
418                     if (element == null)
419                         throw new NoSuchElementException();
420
421                     advance();
422
423                     if (element != REMOVED_ELEMENT) {
424                         @SuppressWarnings("unchecked")
425                         T e = (T)element;
426                         return e;
427                     }
428                 }
429             }
430
431             private void advance()
432             {
433                 if (++index == getBlockSize())
434                 {
435                     index = 0;
436                     ++blockIndex;
437                 }
438             }
439
440             @Override
441             public void remove()
442             {
443                 throw new UnsupportedOperationException();
444             }
445         };
446     }
447
448     @Override
449     public int size()
450     {
451         Block<T> currentHeadBlock = getHeadBlock();
452         int head = currentHeadBlock.head();
453         int size = 0;
454         while (true)
455         {
456             if (head == getBlockSize())
457             {
458                 Block<T> nextHeadBlock = currentHeadBlock.next();
459                 if (nextHeadBlock == null)
460                 {
461                     break;
462                 }
463                 else
464                 {
465                     // Use next block and loop
466                     currentHeadBlock = nextHeadBlock;
467                     head = currentHeadBlock.head();
468                 }
469             }
470             else
471             {
472                 Object element = currentHeadBlock.peek(head);
473                 if (element == REMOVED_ELEMENT)
474                 {
475                     // Already removed, try next index
476                     ++head;
477                 }
478                 else if (element != null)
479                 {
480                     ++size;
481                     ++head;
482                 }
483                 else
484                 {
485                     break;
486                 }
487             }
488         }
489         return size;
490     }
491
492     protected Block<T> newBlock()
493     {
494         return new Block<>(getBlockSize());
495     }
496
497     protected int getBlockCount()
498     {
499         int result = 0;
500         Block<T> headBlock = getHeadBlock();
501         while (headBlock != null)
502         {
503             ++result;
504             headBlock = headBlock.next();
505         }
506         return result;
507     }
508
509     protected static final class Block<E>
510     {
511         private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1;
512         private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1;
513
514         private final AtomicReferenceArray<Object> elements;
515         private final AtomicReference<Block<E>> next = new AtomicReference<>();
516         private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1);
517
518         protected Block(int blockSize)
519         {
520             elements = new AtomicReferenceArray<>(blockSize);
521         }
522
523         @SuppressWarnings("unchecked")
524         public E peek(int index)
525         {
526             return (E)elements.get(index);
527         }
528
529         public boolean store(int index, E item)
530         {
531             boolean result = elements.compareAndSet(index, null, item);
532             if (result)
533                 indexes.incrementAndGet(tailOffset);
534             return result;
535         }
536
537         public boolean remove(int index, Object item, boolean updateHead)
538         {
539             boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT);
540             if (result && updateHead)
541                 indexes.incrementAndGet(headOffset);
542             return result;
543         }
544
545         public Block<E> next()
546         {
547             return next.get();
548         }
549
550         public boolean link(Block<E> nextBlock)
551         {
552             return next.compareAndSet(null, nextBlock);
553         }
554
555         public int head()
556         {
557             return indexes.get(headOffset);
558         }
559
560         public int tail()
561         {
562             return indexes.get(tailOffset);
563         }
564
565         public Object[] arrayCopy()
566         {
567             Object[] result = new Object[elements.length()];
568             for (int i = 0; i < result.length; ++i)
569                 result[i] = elements.get(i);
570             return result;
571         }
572     }
573 }