Home | History | Annotate | Download | only in collect
      1 /*
      2  * Copyright (C) 2011 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
      5  * in compliance with the License. You may obtain a copy of the License at
      6  *
      7  * http://www.apache.org/licenses/LICENSE-2.0
      8  *
      9  * Unless required by applicable law or agreed to in writing, software distributed under the License
     10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
     11  * or implied. See the License for the specific language governing permissions and limitations under
     12  * the License.
     13  */
     14 
     15 package com.google.common.collect;
     16 
     17 import com.google.common.annotations.Beta;
     18 import com.google.common.base.Preconditions;
     19 
     20 import java.util.ArrayDeque;
     21 import java.util.Collection;
     22 import java.util.PriorityQueue;
     23 import java.util.Queue;
     24 import java.util.concurrent.ArrayBlockingQueue;
     25 import java.util.concurrent.BlockingQueue;
     26 import java.util.concurrent.ConcurrentLinkedQueue;
     27 import java.util.concurrent.LinkedBlockingDeque;
     28 import java.util.concurrent.LinkedBlockingQueue;
     29 import java.util.concurrent.PriorityBlockingQueue;
     30 import java.util.concurrent.SynchronousQueue;
     31 import java.util.concurrent.TimeUnit;
     32 
     33 /**
     34  * Static utility methods pertaining to {@link Queue}
     35  * instances. Also see this class's counterparts
     36  * {@link Lists}, {@link Sets}, and {@link Maps}.
     37  *
     38  * @author Kurt Alfred Kluever
     39  * @since 11.0
     40  */
     41 @Beta
     42 public final class Queues {
     43   private Queues() {}
     44 
     45   // ArrayBlockingQueue
     46 
     47   /**
     48    * Creates an empty {@code ArrayBlockingQueue} instance.
     49    *
     50    * @return a new, empty {@code ArrayBlockingQueue}
     51    */
     52   public static <E> ArrayBlockingQueue<E> newArrayBlockingQueue(int capacity) {
     53     return new ArrayBlockingQueue<E>(capacity);
     54   }
     55 
     56   // ArrayDeque
     57 
     58   // ConcurrentLinkedQueue
     59 
     60   /**
     61    * Creates an empty {@code ConcurrentLinkedQueue} instance.
     62    *
     63    * @return a new, empty {@code ConcurrentLinkedQueue}
     64    */
     65   public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue() {
     66     return new ConcurrentLinkedQueue<E>();
     67   }
     68 
     69   /**
     70    * Creates an {@code ConcurrentLinkedQueue} instance containing the given elements.
     71    *
     72    * @param elements the elements that the queue should contain, in order
     73    * @return a new {@code ConcurrentLinkedQueue} containing those elements
     74    */
     75   public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue(
     76       Iterable<? extends E> elements) {
     77     if (elements instanceof Collection) {
     78       return new ConcurrentLinkedQueue<E>(Collections2.cast(elements));
     79     }
     80     ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>();
     81     Iterables.addAll(queue, elements);
     82     return queue;
     83   }
     84 
     85   // LinkedBlockingDeque
     86 
     87   // LinkedBlockingQueue
     88 
     89   /**
     90    * Creates an empty {@code LinkedBlockingQueue} instance.
     91    *
     92    * @return a new, empty {@code LinkedBlockingQueue}
     93    */
     94   public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue() {
     95     return new LinkedBlockingQueue<E>();
     96   }
     97 
     98   /**
     99    * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
    100    *
    101    * @param capacity the capacity of this queue
    102    * @return a new, empty {@code LinkedBlockingQueue}
    103    * @throws IllegalArgumentException if {@code capacity} is less than 1
    104    */
    105   public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(int capacity) {
    106     return new LinkedBlockingQueue<E>(capacity);
    107   }
    108 
    109   /**
    110    * Creates an {@code LinkedBlockingQueue} instance containing the given elements.
    111    *
    112    * @param elements the elements that the queue should contain, in order
    113    * @return a new {@code LinkedBlockingQueue} containing those elements
    114    */
    115   public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(Iterable<? extends E> elements) {
    116     if (elements instanceof Collection) {
    117       return new LinkedBlockingQueue<E>(Collections2.cast(elements));
    118     }
    119     LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<E>();
    120     Iterables.addAll(queue, elements);
    121     return queue;
    122   }
    123 
    124   // LinkedList: see {@link com.google.common.collect.Lists}
    125 
    126   // PriorityBlockingQueue
    127 
    128   /**
    129    * Creates an empty {@code PriorityBlockingQueue} instance.
    130    *
    131    * @return a new, empty {@code PriorityBlockingQueue}
    132    */
    133   public static <E> PriorityBlockingQueue<E> newPriorityBlockingQueue() {
    134     return new PriorityBlockingQueue<E>();
    135   }
    136 
    137   /**
    138    * Creates an {@code PriorityBlockingQueue} instance containing the given elements.
    139    *
    140    * @param elements the elements that the queue should contain, in order
    141    * @return a new {@code PriorityBlockingQueue} containing those elements
    142    */
    143   public static <E> PriorityBlockingQueue<E> newPriorityBlockingQueue(
    144       Iterable<? extends E> elements) {
    145     if (elements instanceof Collection) {
    146       return new PriorityBlockingQueue<E>(Collections2.cast(elements));
    147     }
    148     PriorityBlockingQueue<E> queue = new PriorityBlockingQueue<E>();
    149     Iterables.addAll(queue, elements);
    150     return queue;
    151   }
    152 
    153   // PriorityQueue
    154 
    155   /**
    156    * Creates an empty {@code PriorityQueue} instance.
    157    *
    158    * @return a new, empty {@code PriorityQueue}
    159    */
    160   public static <E> PriorityQueue<E> newPriorityQueue() {
    161     return new PriorityQueue<E>();
    162   }
    163 
    164   /**
    165    * Creates an {@code PriorityQueue} instance containing the given elements.
    166    *
    167    * @param elements the elements that the queue should contain, in order
    168    * @return a new {@code PriorityQueue} containing those elements
    169    */
    170   public static <E> PriorityQueue<E> newPriorityQueue(Iterable<? extends E> elements) {
    171     if (elements instanceof Collection) {
    172       return new PriorityQueue<E>(Collections2.cast(elements));
    173     }
    174     PriorityQueue<E> queue = new PriorityQueue<E>();
    175     Iterables.addAll(queue, elements);
    176     return queue;
    177   }
    178 
    179   // SynchronousQueue
    180 
    181   /**
    182    * Creates an empty {@code SynchronousQueue} instance.
    183    *
    184    * @return a new, empty {@code SynchronousQueue}
    185    */
    186   public static <E> SynchronousQueue<E> newSynchronousQueue() {
    187     return new SynchronousQueue<E>();
    188   }
    189 
    190   /**
    191    * Drains the queue as {@link BlockingQueue#drainTo(Collection, int)}, but if the requested
    192    * {@code numElements} elements are not available, it will wait for them up to the specified
    193    * timeout.
    194    *
    195    * @param q the blocking queue to be drained
    196    * @param buffer where to add the transferred elements
    197    * @param numElements the number of elements to be waited for
    198    * @param timeout how long to wait before giving up, in units of {@code unit}
    199    * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
    200    * @return the number of elements transferred
    201    * @throws InterruptedException if interrupted while waiting
    202    */
    203   public static <E> int drain(BlockingQueue<E> q, Collection<? super E> buffer, int numElements,
    204       long timeout, TimeUnit unit) throws InterruptedException {
    205     Preconditions.checkNotNull(buffer);
    206     /*
    207      * This code performs one System.nanoTime() more than necessary, and in return, the time to
    208      * execute Queue#drainTo is not added *on top* of waiting for the timeout (which could make
    209      * the timeout arbitrarily inaccurate, given a queue that is slow to drain).
    210      */
    211     long deadline = System.nanoTime() + unit.toNanos(timeout);
    212     int added = 0;
    213     while (added < numElements) {
    214       // we could rely solely on #poll, but #drainTo might be more efficient when there are multiple
    215       // elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
    216       added += q.drainTo(buffer, numElements - added);
    217       if (added < numElements) { // not enough elements immediately available; will have to poll
    218         E e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
    219         if (e == null) {
    220           break; // we already waited enough, and there are no more elements in sight
    221         }
    222         buffer.add(e);
    223         added++;
    224       }
    225     }
    226     return added;
    227   }
    228 
    229   /**
    230    * Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, long, TimeUnit)},
    231    * but with a different behavior in case it is interrupted while waiting. In that case, the
    232    * operation will continue as usual, and in the end the thread's interruption status will be set
    233    * (no {@code InterruptedException} is thrown).
    234    *
    235    * @param q the blocking queue to be drained
    236    * @param buffer where to add the transferred elements
    237    * @param numElements the number of elements to be waited for
    238    * @param timeout how long to wait before giving up, in units of {@code unit}
    239    * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
    240    * @return the number of elements transferred
    241    */
    242   public static <E> int drainUninterruptibly(BlockingQueue<E> q, Collection<? super E> buffer,
    243       int numElements, long timeout, TimeUnit unit) {
    244     Preconditions.checkNotNull(buffer);
    245     long deadline = System.nanoTime() + unit.toNanos(timeout);
    246     int added = 0;
    247     boolean interrupted = false;
    248     try {
    249       while (added < numElements) {
    250         // we could rely solely on #poll, but #drainTo might be more efficient when there are
    251         // multiple elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
    252         added += q.drainTo(buffer, numElements - added);
    253         if (added < numElements) { // not enough elements immediately available; will have to poll
    254           E e; // written exactly once, by a successful (uninterrupted) invocation of #poll
    255           while (true) {
    256             try {
    257               e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
    258               break;
    259             } catch (InterruptedException ex) {
    260               interrupted = true; // note interruption and retry
    261             }
    262           }
    263           if (e == null) {
    264             break; // we already waited enough, and there are no more elements in sight
    265           }
    266           buffer.add(e);
    267           added++;
    268         }
    269       }
    270     } finally {
    271       if (interrupted) {
    272         Thread.currentThread().interrupt();
    273       }
    274     }
    275     return added;
    276   }
    277 }
    278