Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Written by Doug Lea with assistance from members of JCP JSR-166
      3  * Expert Group and released to the public domain, as explained at
      4  * http://creativecommons.org/publicdomain/zero/1.0/
      5  */
      6 
      7 package java.util.concurrent;
      8 import static java.util.concurrent.TimeUnit.NANOSECONDS;
      9 import java.util.concurrent.atomic.AtomicLong;
     10 import java.util.concurrent.locks.Condition;
     11 import java.util.concurrent.locks.ReentrantLock;
     12 import java.util.*;
     13 
     14 // BEGIN android-note
     15 // omit class-level docs on setRemoveOnCancelPolicy()
     16 // END android-note
     17 
     18 /**
     19  * A {@link ThreadPoolExecutor} that can additionally schedule
     20  * commands to run after a given delay, or to execute
     21  * periodically. This class is preferable to {@link java.util.Timer}
     22  * when multiple worker threads are needed, or when the additional
     23  * flexibility or capabilities of {@link ThreadPoolExecutor} (which
     24  * this class extends) are required.
     25  *
     26  * <p>Delayed tasks execute no sooner than they are enabled, but
     27  * without any real-time guarantees about when, after they are
     28  * enabled, they will commence. Tasks scheduled for exactly the same
     29  * execution time are enabled in first-in-first-out (FIFO) order of
     30  * submission.
     31  *
     32  * <p>When a submitted task is cancelled before it is run, execution
     33  * is suppressed. By default, such a cancelled task is not
     34  * automatically removed from the work queue until its delay
     35  * elapses. While this enables further inspection and monitoring, it
     36  * may also cause unbounded retention of cancelled tasks.
     37  *
     38  * <p>Successive executions of a task scheduled via
     39  * {@code scheduleAtFixedRate} or
     40  * {@code scheduleWithFixedDelay} do not overlap. While different
     41  * executions may be performed by different threads, the effects of
     42  * prior executions <a
     43  * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
     44  * those of subsequent ones.
     45  *
     46  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
     47  * of the inherited tuning methods are not useful for it. In
     48  * particular, because it acts as a fixed-sized pool using
     49  * {@code corePoolSize} threads and an unbounded queue, adjustments
     50  * to {@code maximumPoolSize} have no useful effect. Additionally, it
     51  * is almost never a good idea to set {@code corePoolSize} to zero or
     52  * use {@code allowCoreThreadTimeOut} because this may leave the pool
     53  * without threads to handle tasks once they become eligible to run.
     54  *
     55  * <p><b>Extension notes:</b> This class overrides the
     56  * {@link ThreadPoolExecutor#execute(Runnable) execute} and
     57  * {@link AbstractExecutorService#submit(Runnable) submit}
     58  * methods to generate internal {@link ScheduledFuture} objects to
     59  * control per-task delays and scheduling.  To preserve
     60  * functionality, any further overrides of these methods in
     61  * subclasses must invoke superclass versions, which effectively
     62  * disables additional task customization.  However, this class
     63  * provides alternative protected extension method
     64  * {@code decorateTask} (one version each for {@code Runnable} and
     65  * {@code Callable}) that can be used to customize the concrete task
     66  * types used to execute commands entered via {@code execute},
     67  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
     68  * and {@code scheduleWithFixedDelay}.  By default, a
     69  * {@code ScheduledThreadPoolExecutor} uses a task type extending
     70  * {@link FutureTask}. However, this may be modified or replaced using
     71  * subclasses of the form:
     72  *
     73  *  <pre> {@code
     74  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
     75  *
     76  *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
     77  *
     78  *   protected <V> RunnableScheduledFuture<V> decorateTask(
     79  *                Runnable r, RunnableScheduledFuture<V> task) {
     80  *       return new CustomTask<V>(r, task);
     81  *   }
     82  *
     83  *   protected <V> RunnableScheduledFuture<V> decorateTask(
     84  *                Callable<V> c, RunnableScheduledFuture<V> task) {
     85  *       return new CustomTask<V>(c, task);
     86  *   }
     87  *   // ... add constructors, etc.
     88  * }}</pre>
     89  *
     90  * @since 1.5
     91  * @author Doug Lea
     92  */
     93 public class ScheduledThreadPoolExecutor
     94         extends ThreadPoolExecutor
     95         implements ScheduledExecutorService {
     96 
     97     /*
     98      * This class specializes ThreadPoolExecutor implementation by
     99      *
    100      * 1. Using a custom task type, ScheduledFutureTask for
    101      *    tasks, even those that don't require scheduling (i.e.,
    102      *    those submitted using ExecutorService execute, not
    103      *    ScheduledExecutorService methods) which are treated as
    104      *    delayed tasks with a delay of zero.
    105      *
    106      * 2. Using a custom queue (DelayedWorkQueue), a variant of
    107      *    unbounded DelayQueue. The lack of capacity constraint and
    108      *    the fact that corePoolSize and maximumPoolSize are
    109      *    effectively identical simplifies some execution mechanics
    110      *    (see delayedExecute) compared to ThreadPoolExecutor.
    111      *
    112      * 3. Supporting optional run-after-shutdown parameters, which
    113      *    leads to overrides of shutdown methods to remove and cancel
    114      *    tasks that should NOT be run after shutdown, as well as
    115      *    different recheck logic when task (re)submission overlaps
    116      *    with a shutdown.
    117      *
    118      * 4. Task decoration methods to allow interception and
    119      *    instrumentation, which are needed because subclasses cannot
    120      *    otherwise override submit methods to get this effect. These
    121      *    don't have any impact on pool control logic though.
    122      */
    123 
    124     /**
    125      * False if should cancel/suppress periodic tasks on shutdown.
    126      */
    127     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
    128 
    129     /**
    130      * False if should cancel non-periodic tasks on shutdown.
    131      */
    132     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
    133 
    134     /**
    135      * True if ScheduledFutureTask.cancel should remove from queue
    136      */
    137     private volatile boolean removeOnCancel = false;
    138 
    139     /**
    140      * Sequence number to break scheduling ties, and in turn to
    141      * guarantee FIFO order among tied entries.
    142      */
    143     private static final AtomicLong sequencer = new AtomicLong();
    144 
    145     /**
    146      * Returns current nanosecond time.
    147      */
    148     final long now() {
    149         return System.nanoTime();
    150     }
    151 
    152     private class ScheduledFutureTask<V>
    153             extends FutureTask<V> implements RunnableScheduledFuture<V> {
    154 
    155         /** Sequence number to break ties FIFO */
    156         private final long sequenceNumber;
    157 
    158         /** The time the task is enabled to execute in nanoTime units */
    159         private long time;
    160 
    161         /**
    162          * Period in nanoseconds for repeating tasks.  A positive
    163          * value indicates fixed-rate execution.  A negative value
    164          * indicates fixed-delay execution.  A value of 0 indicates a
    165          * non-repeating task.
    166          */
    167         private final long period;
    168 
    169         /** The actual task to be re-enqueued by reExecutePeriodic */
    170         RunnableScheduledFuture<V> outerTask = this;
    171 
    172         /**
    173          * Index into delay queue, to support faster cancellation.
    174          */
    175         int heapIndex;
    176 
    177         /**
    178          * Creates a one-shot action with given nanoTime-based trigger time.
    179          */
    180         ScheduledFutureTask(Runnable r, V result, long ns) {
    181             super(r, result);
    182             this.time = ns;
    183             this.period = 0;
    184             this.sequenceNumber = sequencer.getAndIncrement();
    185         }
    186 
    187         /**
    188          * Creates a periodic action with given nano time and period.
    189          */
    190         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    191             super(r, result);
    192             this.time = ns;
    193             this.period = period;
    194             this.sequenceNumber = sequencer.getAndIncrement();
    195         }
    196 
    197         /**
    198          * Creates a one-shot action with given nanoTime-based trigger time.
    199          */
    200         ScheduledFutureTask(Callable<V> callable, long ns) {
    201             super(callable);
    202             this.time = ns;
    203             this.period = 0;
    204             this.sequenceNumber = sequencer.getAndIncrement();
    205         }
    206 
    207         public long getDelay(TimeUnit unit) {
    208             return unit.convert(time - now(), NANOSECONDS);
    209         }
    210 
    211         public int compareTo(Delayed other) {
    212             if (other == this) // compare zero if same object
    213                 return 0;
    214             if (other instanceof ScheduledFutureTask) {
    215                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
    216                 long diff = time - x.time;
    217                 if (diff < 0)
    218                     return -1;
    219                 else if (diff > 0)
    220                     return 1;
    221                 else if (sequenceNumber < x.sequenceNumber)
    222                     return -1;
    223                 else
    224                     return 1;
    225             }
    226             long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    227             return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    228         }
    229 
    230         /**
    231          * Returns {@code true} if this is a periodic (not a one-shot) action.
    232          *
    233          * @return {@code true} if periodic
    234          */
    235         public boolean isPeriodic() {
    236             return period != 0;
    237         }
    238 
    239         /**
    240          * Sets the next time to run for a periodic task.
    241          */
    242         private void setNextRunTime() {
    243             long p = period;
    244             if (p > 0)
    245                 time += p;
    246             else
    247                 time = triggerTime(-p);
    248         }
    249 
    250         public boolean cancel(boolean mayInterruptIfRunning) {
    251             boolean cancelled = super.cancel(mayInterruptIfRunning);
    252             if (cancelled && removeOnCancel && heapIndex >= 0)
    253                 remove(this);
    254             return cancelled;
    255         }
    256 
    257         /**
    258          * Overrides FutureTask version so as to reset/requeue if periodic.
    259          */
    260         public void run() {
    261             boolean periodic = isPeriodic();
    262             if (!canRunInCurrentRunState(periodic))
    263                 cancel(false);
    264             else if (!periodic)
    265                 ScheduledFutureTask.super.run();
    266             else if (ScheduledFutureTask.super.runAndReset()) {
    267                 setNextRunTime();
    268                 reExecutePeriodic(outerTask);
    269             }
    270         }
    271     }
    272 
    273     /**
    274      * Returns true if can run a task given current run state
    275      * and run-after-shutdown parameters.
    276      *
    277      * @param periodic true if this task periodic, false if delayed
    278      */
    279     boolean canRunInCurrentRunState(boolean periodic) {
    280         return isRunningOrShutdown(periodic ?
    281                                    continueExistingPeriodicTasksAfterShutdown :
    282                                    executeExistingDelayedTasksAfterShutdown);
    283     }
    284 
    285     /**
    286      * Main execution method for delayed or periodic tasks.  If pool
    287      * is shut down, rejects the task. Otherwise adds task to queue
    288      * and starts a thread, if necessary, to run it.  (We cannot
    289      * prestart the thread to run the task because the task (probably)
    290      * shouldn't be run yet.)  If the pool is shut down while the task
    291      * is being added, cancel and remove it if required by state and
    292      * run-after-shutdown parameters.
    293      *
    294      * @param task the task
    295      */
    296     private void delayedExecute(RunnableScheduledFuture<?> task) {
    297         if (isShutdown())
    298             reject(task);
    299         else {
    300             super.getQueue().add(task);
    301             if (isShutdown() &&
    302                 !canRunInCurrentRunState(task.isPeriodic()) &&
    303                 remove(task))
    304                 task.cancel(false);
    305             else
    306                 ensurePrestart();
    307         }
    308     }
    309 
    310     /**
    311      * Requeues a periodic task unless current run state precludes it.
    312      * Same idea as delayedExecute except drops task rather than rejecting.
    313      *
    314      * @param task the task
    315      */
    316     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    317         if (canRunInCurrentRunState(true)) {
    318             super.getQueue().add(task);
    319             if (!canRunInCurrentRunState(true) && remove(task))
    320                 task.cancel(false);
    321             else
    322                 ensurePrestart();
    323         }
    324     }
    325 
    326     /**
    327      * Cancels and clears the queue of all tasks that should not be run
    328      * due to shutdown policy.  Invoked within super.shutdown.
    329      */
    330     @Override void onShutdown() {
    331         BlockingQueue<Runnable> q = super.getQueue();
    332         boolean keepDelayed =
    333             getExecuteExistingDelayedTasksAfterShutdownPolicy();
    334         boolean keepPeriodic =
    335             getContinueExistingPeriodicTasksAfterShutdownPolicy();
    336         if (!keepDelayed && !keepPeriodic) {
    337             for (Object e : q.toArray())
    338                 if (e instanceof RunnableScheduledFuture<?>)
    339                     ((RunnableScheduledFuture<?>) e).cancel(false);
    340             q.clear();
    341         }
    342         else {
    343             // Traverse snapshot to avoid iterator exceptions
    344             for (Object e : q.toArray()) {
    345                 if (e instanceof RunnableScheduledFuture) {
    346                     RunnableScheduledFuture<?> t =
    347                         (RunnableScheduledFuture<?>)e;
    348                     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
    349                         t.isCancelled()) { // also remove if already cancelled
    350                         if (q.remove(t))
    351                             t.cancel(false);
    352                     }
    353                 }
    354             }
    355         }
    356         tryTerminate();
    357     }
    358 
    359     /**
    360      * Modifies or replaces the task used to execute a runnable.
    361      * This method can be used to override the concrete
    362      * class used for managing internal tasks.
    363      * The default implementation simply returns the given task.
    364      *
    365      * @param runnable the submitted Runnable
    366      * @param task the task created to execute the runnable
    367      * @return a task that can execute the runnable
    368      * @since 1.6
    369      */
    370     protected <V> RunnableScheduledFuture<V> decorateTask(
    371         Runnable runnable, RunnableScheduledFuture<V> task) {
    372         return task;
    373     }
    374 
    375     /**
    376      * Modifies or replaces the task used to execute a callable.
    377      * This method can be used to override the concrete
    378      * class used for managing internal tasks.
    379      * The default implementation simply returns the given task.
    380      *
    381      * @param callable the submitted Callable
    382      * @param task the task created to execute the callable
    383      * @return a task that can execute the callable
    384      * @since 1.6
    385      */
    386     protected <V> RunnableScheduledFuture<V> decorateTask(
    387         Callable<V> callable, RunnableScheduledFuture<V> task) {
    388         return task;
    389     }
    390 
    391     /**
    392      * Creates a new {@code ScheduledThreadPoolExecutor} with the
    393      * given core pool size.
    394      *
    395      * @param corePoolSize the number of threads to keep in the pool, even
    396      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
    397      * @throws IllegalArgumentException if {@code corePoolSize < 0}
    398      */
    399     public ScheduledThreadPoolExecutor(int corePoolSize) {
    400         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    401               new DelayedWorkQueue());
    402     }
    403 
    404     /**
    405      * Creates a new {@code ScheduledThreadPoolExecutor} with the
    406      * given initial parameters.
    407      *
    408      * @param corePoolSize the number of threads to keep in the pool, even
    409      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
    410      * @param threadFactory the factory to use when the executor
    411      *        creates a new thread
    412      * @throws IllegalArgumentException if {@code corePoolSize < 0}
    413      * @throws NullPointerException if {@code threadFactory} is null
    414      */
    415     public ScheduledThreadPoolExecutor(int corePoolSize,
    416                                        ThreadFactory threadFactory) {
    417         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    418               new DelayedWorkQueue(), threadFactory);
    419     }
    420 
    421     /**
    422      * Creates a new ScheduledThreadPoolExecutor with the given
    423      * initial parameters.
    424      *
    425      * @param corePoolSize the number of threads to keep in the pool, even
    426      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
    427      * @param handler the handler to use when execution is blocked
    428      *        because the thread bounds and queue capacities are reached
    429      * @throws IllegalArgumentException if {@code corePoolSize < 0}
    430      * @throws NullPointerException if {@code handler} is null
    431      */
    432     public ScheduledThreadPoolExecutor(int corePoolSize,
    433                                        RejectedExecutionHandler handler) {
    434         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    435               new DelayedWorkQueue(), handler);
    436     }
    437 
    438     /**
    439      * Creates a new ScheduledThreadPoolExecutor with the given
    440      * initial parameters.
    441      *
    442      * @param corePoolSize the number of threads to keep in the pool, even
    443      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
    444      * @param threadFactory the factory to use when the executor
    445      *        creates a new thread
    446      * @param handler the handler to use when execution is blocked
    447      *        because the thread bounds and queue capacities are reached
    448      * @throws IllegalArgumentException if {@code corePoolSize < 0}
    449      * @throws NullPointerException if {@code threadFactory} or
    450      *         {@code handler} is null
    451      */
    452     public ScheduledThreadPoolExecutor(int corePoolSize,
    453                                        ThreadFactory threadFactory,
    454                                        RejectedExecutionHandler handler) {
    455         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    456               new DelayedWorkQueue(), threadFactory, handler);
    457     }
    458 
    459     /**
    460      * Returns the trigger time of a delayed action.
    461      */
    462     private long triggerTime(long delay, TimeUnit unit) {
    463         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    464     }
    465 
    466     /**
    467      * Returns the trigger time of a delayed action.
    468      */
    469     long triggerTime(long delay) {
    470         return now() +
    471             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    472     }
    473 
    474     /**
    475      * Constrains the values of all delays in the queue to be within
    476      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
    477      * This may occur if a task is eligible to be dequeued, but has
    478      * not yet been, while some other task is added with a delay of
    479      * Long.MAX_VALUE.
    480      */
    481     private long overflowFree(long delay) {
    482         Delayed head = (Delayed) super.getQueue().peek();
    483         if (head != null) {
    484             long headDelay = head.getDelay(NANOSECONDS);
    485             if (headDelay < 0 && (delay - headDelay < 0))
    486                 delay = Long.MAX_VALUE + headDelay;
    487         }
    488         return delay;
    489     }
    490 
    491     /**
    492      * @throws RejectedExecutionException {@inheritDoc}
    493      * @throws NullPointerException       {@inheritDoc}
    494      */
    495     public ScheduledFuture<?> schedule(Runnable command,
    496                                        long delay,
    497                                        TimeUnit unit) {
    498         if (command == null || unit == null)
    499             throw new NullPointerException();
    500         RunnableScheduledFuture<?> t = decorateTask(command,
    501             new ScheduledFutureTask<Void>(command, null,
    502                                           triggerTime(delay, unit)));
    503         delayedExecute(t);
    504         return t;
    505     }
    506 
    507     /**
    508      * @throws RejectedExecutionException {@inheritDoc}
    509      * @throws NullPointerException       {@inheritDoc}
    510      */
    511     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
    512                                            long delay,
    513                                            TimeUnit unit) {
    514         if (callable == null || unit == null)
    515             throw new NullPointerException();
    516         RunnableScheduledFuture<V> t = decorateTask(callable,
    517             new ScheduledFutureTask<V>(callable,
    518                                        triggerTime(delay, unit)));
    519         delayedExecute(t);
    520         return t;
    521     }
    522 
    523     /**
    524      * @throws RejectedExecutionException {@inheritDoc}
    525      * @throws NullPointerException       {@inheritDoc}
    526      * @throws IllegalArgumentException   {@inheritDoc}
    527      */
    528     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
    529                                                   long initialDelay,
    530                                                   long period,
    531                                                   TimeUnit unit) {
    532         if (command == null || unit == null)
    533             throw new NullPointerException();
    534         if (period <= 0)
    535             throw new IllegalArgumentException();
    536         ScheduledFutureTask<Void> sft =
    537             new ScheduledFutureTask<Void>(command,
    538                                           null,
    539                                           triggerTime(initialDelay, unit),
    540                                           unit.toNanos(period));
    541         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    542         sft.outerTask = t;
    543         delayedExecute(t);
    544         return t;
    545     }
    546 
    547     /**
    548      * @throws RejectedExecutionException {@inheritDoc}
    549      * @throws NullPointerException       {@inheritDoc}
    550      * @throws IllegalArgumentException   {@inheritDoc}
    551      */
    552     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
    553                                                      long initialDelay,
    554                                                      long delay,
    555                                                      TimeUnit unit) {
    556         if (command == null || unit == null)
    557             throw new NullPointerException();
    558         if (delay <= 0)
    559             throw new IllegalArgumentException();
    560         ScheduledFutureTask<Void> sft =
    561             new ScheduledFutureTask<Void>(command,
    562                                           null,
    563                                           triggerTime(initialDelay, unit),
    564                                           unit.toNanos(-delay));
    565         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    566         sft.outerTask = t;
    567         delayedExecute(t);
    568         return t;
    569     }
    570 
    571     /**
    572      * Executes {@code command} with zero required delay.
    573      * This has effect equivalent to
    574      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
    575      * Note that inspections of the queue and of the list returned by
    576      * {@code shutdownNow} will access the zero-delayed
    577      * {@link ScheduledFuture}, not the {@code command} itself.
    578      *
    579      * <p>A consequence of the use of {@code ScheduledFuture} objects is
    580      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
    581      * called with a null second {@code Throwable} argument, even if the
    582      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
    583      * thrown by such a task can be obtained via {@link Future#get}.
    584      *
    585      * @throws RejectedExecutionException at discretion of
    586      *         {@code RejectedExecutionHandler}, if the task
    587      *         cannot be accepted for execution because the
    588      *         executor has been shut down
    589      * @throws NullPointerException {@inheritDoc}
    590      */
    591     public void execute(Runnable command) {
    592         schedule(command, 0, NANOSECONDS);
    593     }
    594 
    595     // Override AbstractExecutorService methods
    596 
    597     /**
    598      * @throws RejectedExecutionException {@inheritDoc}
    599      * @throws NullPointerException       {@inheritDoc}
    600      */
    601     public Future<?> submit(Runnable task) {
    602         return schedule(task, 0, NANOSECONDS);
    603     }
    604 
    605     /**
    606      * @throws RejectedExecutionException {@inheritDoc}
    607      * @throws NullPointerException       {@inheritDoc}
    608      */
    609     public <T> Future<T> submit(Runnable task, T result) {
    610         return schedule(Executors.callable(task, result), 0, NANOSECONDS);
    611     }
    612 
    613     /**
    614      * @throws RejectedExecutionException {@inheritDoc}
    615      * @throws NullPointerException       {@inheritDoc}
    616      */
    617     public <T> Future<T> submit(Callable<T> task) {
    618         return schedule(task, 0, NANOSECONDS);
    619     }
    620 
    621     /**
    622      * Sets the policy on whether to continue executing existing
    623      * periodic tasks even when this executor has been {@code shutdown}.
    624      * In this case, these tasks will only terminate upon
    625      * {@code shutdownNow} or after setting the policy to
    626      * {@code false} when already shutdown.
    627      * This value is by default {@code false}.
    628      *
    629      * @param value if {@code true}, continue after shutdown, else don't
    630      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
    631      */
    632     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
    633         continueExistingPeriodicTasksAfterShutdown = value;
    634         if (!value && isShutdown())
    635             onShutdown();
    636     }
    637 
    638     /**
    639      * Gets the policy on whether to continue executing existing
    640      * periodic tasks even when this executor has been {@code shutdown}.
    641      * In this case, these tasks will only terminate upon
    642      * {@code shutdownNow} or after setting the policy to
    643      * {@code false} when already shutdown.
    644      * This value is by default {@code false}.
    645      *
    646      * @return {@code true} if will continue after shutdown
    647      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
    648      */
    649     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
    650         return continueExistingPeriodicTasksAfterShutdown;
    651     }
    652 
    653     /**
    654      * Sets the policy on whether to execute existing delayed
    655      * tasks even when this executor has been {@code shutdown}.
    656      * In this case, these tasks will only terminate upon
    657      * {@code shutdownNow}, or after setting the policy to
    658      * {@code false} when already shutdown.
    659      * This value is by default {@code true}.
    660      *
    661      * @param value if {@code true}, execute after shutdown, else don't
    662      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
    663      */
    664     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
    665         executeExistingDelayedTasksAfterShutdown = value;
    666         if (!value && isShutdown())
    667             onShutdown();
    668     }
    669 
    670     /**
    671      * Gets the policy on whether to execute existing delayed
    672      * tasks even when this executor has been {@code shutdown}.
    673      * In this case, these tasks will only terminate upon
    674      * {@code shutdownNow}, or after setting the policy to
    675      * {@code false} when already shutdown.
    676      * This value is by default {@code true}.
    677      *
    678      * @return {@code true} if will execute after shutdown
    679      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
    680      */
    681     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
    682         return executeExistingDelayedTasksAfterShutdown;
    683     }
    684 
    685     /**
    686      * Sets the policy on whether cancelled tasks should be immediately
    687      * removed from the work queue at time of cancellation.  This value is
    688      * by default {@code false}.
    689      *
    690      * @param value if {@code true}, remove on cancellation, else don't
    691      * @see #getRemoveOnCancelPolicy
    692      * @since 1.7
    693      */
    694     public void setRemoveOnCancelPolicy(boolean value) {
    695         removeOnCancel = value;
    696     }
    697 
    698     /**
    699      * Gets the policy on whether cancelled tasks should be immediately
    700      * removed from the work queue at time of cancellation.  This value is
    701      * by default {@code false}.
    702      *
    703      * @return {@code true} if cancelled tasks are immediately removed
    704      *         from the queue
    705      * @see #setRemoveOnCancelPolicy
    706      * @since 1.7
    707      */
    708     public boolean getRemoveOnCancelPolicy() {
    709         return removeOnCancel;
    710     }
    711 
    712     /**
    713      * Initiates an orderly shutdown in which previously submitted
    714      * tasks are executed, but no new tasks will be accepted.
    715      * Invocation has no additional effect if already shut down.
    716      *
    717      * <p>This method does not wait for previously submitted tasks to
    718      * complete execution.  Use {@link #awaitTermination awaitTermination}
    719      * to do that.
    720      *
    721      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
    722      * has been set {@code false}, existing delayed tasks whose delays
    723      * have not yet elapsed are cancelled.  And unless the {@code
    724      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
    725      * {@code true}, future executions of existing periodic tasks will
    726      * be cancelled.
    727      */
    728     public void shutdown() {
    729         super.shutdown();
    730     }
    731 
    732     /**
    733      * Attempts to stop all actively executing tasks, halts the
    734      * processing of waiting tasks, and returns a list of the tasks
    735      * that were awaiting execution.
    736      *
    737      * <p>This method does not wait for actively executing tasks to
    738      * terminate.  Use {@link #awaitTermination awaitTermination} to
    739      * do that.
    740      *
    741      * <p>There are no guarantees beyond best-effort attempts to stop
    742      * processing actively executing tasks.  This implementation
    743      * cancels tasks via {@link Thread#interrupt}, so any task that
    744      * fails to respond to interrupts may never terminate.
    745      *
    746      * @return list of tasks that never commenced execution.
    747      *         Each element of this list is a {@link ScheduledFuture},
    748      *         including those tasks submitted using {@code execute},
    749      *         which are for scheduling purposes used as the basis of a
    750      *         zero-delay {@code ScheduledFuture}.
    751      */
    752     public List<Runnable> shutdownNow() {
    753         return super.shutdownNow();
    754     }
    755 
    756     /**
    757      * Returns the task queue used by this executor.  Each element of
    758      * this queue is a {@link ScheduledFuture}, including those
    759      * tasks submitted using {@code execute} which are for scheduling
    760      * purposes used as the basis of a zero-delay
    761      * {@code ScheduledFuture}.  Iteration over this queue is
    762      * <em>not</em> guaranteed to traverse tasks in the order in
    763      * which they will execute.
    764      *
    765      * @return the task queue
    766      */
    767     public BlockingQueue<Runnable> getQueue() {
    768         return super.getQueue();
    769     }
    770 
    771     /**
    772      * Specialized delay queue. To mesh with TPE declarations, this
    773      * class must be declared as a BlockingQueue<Runnable> even though
    774      * it can only hold RunnableScheduledFutures.
    775      */
    776     static class DelayedWorkQueue extends AbstractQueue<Runnable>
    777         implements BlockingQueue<Runnable> {
    778 
    779         /*
    780          * A DelayedWorkQueue is based on a heap-based data structure
    781          * like those in DelayQueue and PriorityQueue, except that
    782          * every ScheduledFutureTask also records its index into the
    783          * heap array. This eliminates the need to find a task upon
    784          * cancellation, greatly speeding up removal (down from O(n)
    785          * to O(log n)), and reducing garbage retention that would
    786          * otherwise occur by waiting for the element to rise to top
    787          * before clearing. But because the queue may also hold
    788          * RunnableScheduledFutures that are not ScheduledFutureTasks,
    789          * we are not guaranteed to have such indices available, in
    790          * which case we fall back to linear search. (We expect that
    791          * most tasks will not be decorated, and that the faster cases
    792          * will be much more common.)
    793          *
    794          * All heap operations must record index changes -- mainly
    795          * within siftUp and siftDown. Upon removal, a task's
    796          * heapIndex is set to -1. Note that ScheduledFutureTasks can
    797          * appear at most once in the queue (this need not be true for
    798          * other kinds of tasks or work queues), so are uniquely
    799          * identified by heapIndex.
    800          */
    801 
    802         private static final int INITIAL_CAPACITY = 16;
    803         private RunnableScheduledFuture<?>[] queue =
    804             new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    805         private final ReentrantLock lock = new ReentrantLock();
    806         private int size = 0;
    807 
    808         /**
    809          * Thread designated to wait for the task at the head of the
    810          * queue.  This variant of the Leader-Follower pattern
    811          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
    812          * minimize unnecessary timed waiting.  When a thread becomes
    813          * the leader, it waits only for the next delay to elapse, but
    814          * other threads await indefinitely.  The leader thread must
    815          * signal some other thread before returning from take() or
    816          * poll(...), unless some other thread becomes leader in the
    817          * interim.  Whenever the head of the queue is replaced with a
    818          * task with an earlier expiration time, the leader field is
    819          * invalidated by being reset to null, and some waiting
    820          * thread, but not necessarily the current leader, is
    821          * signalled.  So waiting threads must be prepared to acquire
    822          * and lose leadership while waiting.
    823          */
    824         private Thread leader = null;
    825 
    826         /**
    827          * Condition signalled when a newer task becomes available at the
    828          * head of the queue or a new thread may need to become leader.
    829          */
    830         private final Condition available = lock.newCondition();
    831 
    832         /**
    833          * Sets f's heapIndex if it is a ScheduledFutureTask.
    834          */
    835         private void setIndex(RunnableScheduledFuture<?> f, int idx) {
    836             if (f instanceof ScheduledFutureTask)
    837                 ((ScheduledFutureTask)f).heapIndex = idx;
    838         }
    839 
    840         /**
    841          * Sifts element added at bottom up to its heap-ordered spot.
    842          * Call only when holding lock.
    843          */
    844         private void siftUp(int k, RunnableScheduledFuture<?> key) {
    845             while (k > 0) {
    846                 int parent = (k - 1) >>> 1;
    847                 RunnableScheduledFuture<?> e = queue[parent];
    848                 if (key.compareTo(e) >= 0)
    849                     break;
    850                 queue[k] = e;
    851                 setIndex(e, k);
    852                 k = parent;
    853             }
    854             queue[k] = key;
    855             setIndex(key, k);
    856         }
    857 
    858         /**
    859          * Sifts element added at top down to its heap-ordered spot.
    860          * Call only when holding lock.
    861          */
    862         private void siftDown(int k, RunnableScheduledFuture<?> key) {
    863             int half = size >>> 1;
    864             while (k < half) {
    865                 int child = (k << 1) + 1;
    866                 RunnableScheduledFuture<?> c = queue[child];
    867                 int right = child + 1;
    868                 if (right < size && c.compareTo(queue[right]) > 0)
    869                     c = queue[child = right];
    870                 if (key.compareTo(c) <= 0)
    871                     break;
    872                 queue[k] = c;
    873                 setIndex(c, k);
    874                 k = child;
    875             }
    876             queue[k] = key;
    877             setIndex(key, k);
    878         }
    879 
    880         /**
    881          * Resizes the heap array.  Call only when holding lock.
    882          */
    883         private void grow() {
    884             int oldCapacity = queue.length;
    885             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
    886             if (newCapacity < 0) // overflow
    887                 newCapacity = Integer.MAX_VALUE;
    888             queue = Arrays.copyOf(queue, newCapacity);
    889         }
    890 
    891         /**
    892          * Finds index of given object, or -1 if absent.
    893          */
    894         private int indexOf(Object x) {
    895             if (x != null) {
    896                 if (x instanceof ScheduledFutureTask) {
    897                     int i = ((ScheduledFutureTask) x).heapIndex;
    898                     // Sanity check; x could conceivably be a
    899                     // ScheduledFutureTask from some other pool.
    900                     if (i >= 0 && i < size && queue[i] == x)
    901                         return i;
    902                 } else {
    903                     for (int i = 0; i < size; i++)
    904                         if (x.equals(queue[i]))
    905                             return i;
    906                 }
    907             }
    908             return -1;
    909         }
    910 
    911         public boolean contains(Object x) {
    912             final ReentrantLock lock = this.lock;
    913             lock.lock();
    914             try {
    915                 return indexOf(x) != -1;
    916             } finally {
    917                 lock.unlock();
    918             }
    919         }
    920 
    921         public boolean remove(Object x) {
    922             final ReentrantLock lock = this.lock;
    923             lock.lock();
    924             try {
    925                 int i = indexOf(x);
    926                 if (i < 0)
    927                     return false;
    928 
    929                 setIndex(queue[i], -1);
    930                 int s = --size;
    931                 RunnableScheduledFuture<?> replacement = queue[s];
    932                 queue[s] = null;
    933                 if (s != i) {
    934                     siftDown(i, replacement);
    935                     if (queue[i] == replacement)
    936                         siftUp(i, replacement);
    937                 }
    938                 return true;
    939             } finally {
    940                 lock.unlock();
    941             }
    942         }
    943 
    944         public int size() {
    945             final ReentrantLock lock = this.lock;
    946             lock.lock();
    947             try {
    948                 return size;
    949             } finally {
    950                 lock.unlock();
    951             }
    952         }
    953 
    954         public boolean isEmpty() {
    955             return size() == 0;
    956         }
    957 
    958         public int remainingCapacity() {
    959             return Integer.MAX_VALUE;
    960         }
    961 
    962         public RunnableScheduledFuture<?> peek() {
    963             final ReentrantLock lock = this.lock;
    964             lock.lock();
    965             try {
    966                 return queue[0];
    967             } finally {
    968                 lock.unlock();
    969             }
    970         }
    971 
    972         public boolean offer(Runnable x) {
    973             if (x == null)
    974                 throw new NullPointerException();
    975             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    976             final ReentrantLock lock = this.lock;
    977             lock.lock();
    978             try {
    979                 int i = size;
    980                 if (i >= queue.length)
    981                     grow();
    982                 size = i + 1;
    983                 if (i == 0) {
    984                     queue[0] = e;
    985                     setIndex(e, 0);
    986                 } else {
    987                     siftUp(i, e);
    988                 }
    989                 if (queue[0] == e) {
    990                     leader = null;
    991                     available.signal();
    992                 }
    993             } finally {
    994                 lock.unlock();
    995             }
    996             return true;
    997         }
    998 
    999         public void put(Runnable e) {
   1000             offer(e);
   1001         }
   1002 
   1003         public boolean add(Runnable e) {
   1004             return offer(e);
   1005         }
   1006 
   1007         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
   1008             return offer(e);
   1009         }
   1010 
   1011         /**
   1012          * Performs common bookkeeping for poll and take: Replaces
   1013          * first element with last and sifts it down.  Call only when
   1014          * holding lock.
   1015          * @param f the task to remove and return
   1016          */
   1017         private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
   1018             int s = --size;
   1019             RunnableScheduledFuture<?> x = queue[s];
   1020             queue[s] = null;
   1021             if (s != 0)
   1022                 siftDown(0, x);
   1023             setIndex(f, -1);
   1024             return f;
   1025         }
   1026 
   1027         public RunnableScheduledFuture<?> poll() {
   1028             final ReentrantLock lock = this.lock;
   1029             lock.lock();
   1030             try {
   1031                 RunnableScheduledFuture<?> first = queue[0];
   1032                 if (first == null || first.getDelay(NANOSECONDS) > 0)
   1033                     return null;
   1034                 else
   1035                     return finishPoll(first);
   1036             } finally {
   1037                 lock.unlock();
   1038             }
   1039         }
   1040 
   1041         public RunnableScheduledFuture<?> take() throws InterruptedException {
   1042             final ReentrantLock lock = this.lock;
   1043             lock.lockInterruptibly();
   1044             try {
   1045                 for (;;) {
   1046                     RunnableScheduledFuture<?> first = queue[0];
   1047                     if (first == null)
   1048                         available.await();
   1049                     else {
   1050                         long delay = first.getDelay(NANOSECONDS);
   1051                         if (delay <= 0)
   1052                             return finishPoll(first);
   1053                         first = null; // don't retain ref while waiting
   1054                         if (leader != null)
   1055                             available.await();
   1056                         else {
   1057                             Thread thisThread = Thread.currentThread();
   1058                             leader = thisThread;
   1059                             try {
   1060                                 available.awaitNanos(delay);
   1061                             } finally {
   1062                                 if (leader == thisThread)
   1063                                     leader = null;
   1064                             }
   1065                         }
   1066                     }
   1067                 }
   1068             } finally {
   1069                 if (leader == null && queue[0] != null)
   1070                     available.signal();
   1071                 lock.unlock();
   1072             }
   1073         }
   1074 
   1075         public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
   1076             throws InterruptedException {
   1077             long nanos = unit.toNanos(timeout);
   1078             final ReentrantLock lock = this.lock;
   1079             lock.lockInterruptibly();
   1080             try {
   1081                 for (;;) {
   1082                     RunnableScheduledFuture<?> first = queue[0];
   1083                     if (first == null) {
   1084                         if (nanos <= 0)
   1085                             return null;
   1086                         else
   1087                             nanos = available.awaitNanos(nanos);
   1088                     } else {
   1089                         long delay = first.getDelay(NANOSECONDS);
   1090                         if (delay <= 0)
   1091                             return finishPoll(first);
   1092                         if (nanos <= 0)
   1093                             return null;
   1094                         first = null; // don't retain ref while waiting
   1095                         if (nanos < delay || leader != null)
   1096                             nanos = available.awaitNanos(nanos);
   1097                         else {
   1098                             Thread thisThread = Thread.currentThread();
   1099                             leader = thisThread;
   1100                             try {
   1101                                 long timeLeft = available.awaitNanos(delay);
   1102                                 nanos -= delay - timeLeft;
   1103                             } finally {
   1104                                 if (leader == thisThread)
   1105                                     leader = null;
   1106                             }
   1107                         }
   1108                     }
   1109                 }
   1110             } finally {
   1111                 if (leader == null && queue[0] != null)
   1112                     available.signal();
   1113                 lock.unlock();
   1114             }
   1115         }
   1116 
   1117         public void clear() {
   1118             final ReentrantLock lock = this.lock;
   1119             lock.lock();
   1120             try {
   1121                 for (int i = 0; i < size; i++) {
   1122                     RunnableScheduledFuture<?> t = queue[i];
   1123                     if (t != null) {
   1124                         queue[i] = null;
   1125                         setIndex(t, -1);
   1126                     }
   1127                 }
   1128                 size = 0;
   1129             } finally {
   1130                 lock.unlock();
   1131             }
   1132         }
   1133 
   1134         /**
   1135          * Returns first element only if it is expired.
   1136          * Used only by drainTo.  Call only when holding lock.
   1137          */
   1138         private RunnableScheduledFuture<?> peekExpired() {
   1139             // assert lock.isHeldByCurrentThread();
   1140             RunnableScheduledFuture<?> first = queue[0];
   1141             return (first == null || first.getDelay(NANOSECONDS) > 0) ?
   1142                 null : first;
   1143         }
   1144 
   1145         public int drainTo(Collection<? super Runnable> c) {
   1146             if (c == null)
   1147                 throw new NullPointerException();
   1148             if (c == this)
   1149                 throw new IllegalArgumentException();
   1150             final ReentrantLock lock = this.lock;
   1151             lock.lock();
   1152             try {
   1153                 RunnableScheduledFuture<?> first;
   1154                 int n = 0;
   1155                 while ((first = peekExpired()) != null) {
   1156                     c.add(first);   // In this order, in case add() throws.
   1157                     finishPoll(first);
   1158                     ++n;
   1159                 }
   1160                 return n;
   1161             } finally {
   1162                 lock.unlock();
   1163             }
   1164         }
   1165 
   1166         public int drainTo(Collection<? super Runnable> c, int maxElements) {
   1167             if (c == null)
   1168                 throw new NullPointerException();
   1169             if (c == this)
   1170                 throw new IllegalArgumentException();
   1171             if (maxElements <= 0)
   1172                 return 0;
   1173             final ReentrantLock lock = this.lock;
   1174             lock.lock();
   1175             try {
   1176                 RunnableScheduledFuture<?> first;
   1177                 int n = 0;
   1178                 while (n < maxElements && (first = peekExpired()) != null) {
   1179                     c.add(first);   // In this order, in case add() throws.
   1180                     finishPoll(first);
   1181                     ++n;
   1182                 }
   1183                 return n;
   1184             } finally {
   1185                 lock.unlock();
   1186             }
   1187         }
   1188 
   1189         public Object[] toArray() {
   1190             final ReentrantLock lock = this.lock;
   1191             lock.lock();
   1192             try {
   1193                 return Arrays.copyOf(queue, size, Object[].class);
   1194             } finally {
   1195                 lock.unlock();
   1196             }
   1197         }
   1198 
   1199         @SuppressWarnings("unchecked")
   1200         public <T> T[] toArray(T[] a) {
   1201             final ReentrantLock lock = this.lock;
   1202             lock.lock();
   1203             try {
   1204                 if (a.length < size)
   1205                     return (T[]) Arrays.copyOf(queue, size, a.getClass());
   1206                 System.arraycopy(queue, 0, a, 0, size);
   1207                 if (a.length > size)
   1208                     a[size] = null;
   1209                 return a;
   1210             } finally {
   1211                 lock.unlock();
   1212             }
   1213         }
   1214 
   1215         public Iterator<Runnable> iterator() {
   1216             return new Itr(Arrays.copyOf(queue, size));
   1217         }
   1218 
   1219         /**
   1220          * Snapshot iterator that works off copy of underlying q array.
   1221          */
   1222         private class Itr implements Iterator<Runnable> {
   1223             final RunnableScheduledFuture[] array;
   1224             int cursor = 0;     // index of next element to return
   1225             int lastRet = -1;   // index of last element, or -1 if no such
   1226 
   1227             Itr(RunnableScheduledFuture[] array) {
   1228                 this.array = array;
   1229             }
   1230 
   1231             public boolean hasNext() {
   1232                 return cursor < array.length;
   1233             }
   1234 
   1235             public Runnable next() {
   1236                 if (cursor >= array.length)
   1237                     throw new NoSuchElementException();
   1238                 lastRet = cursor;
   1239                 return array[cursor++];
   1240             }
   1241 
   1242             public void remove() {
   1243                 if (lastRet < 0)
   1244                     throw new IllegalStateException();
   1245                 DelayedWorkQueue.this.remove(array[lastRet]);
   1246                 lastRet = -1;
   1247             }
   1248         }
   1249     }
   1250 }
   1251