Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2011 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import com.google.common.annotations.Beta;
     20 import com.google.common.base.Preconditions;
     21 import com.google.common.base.Throwables;
     22 
     23 import java.util.concurrent.Callable;
     24 import java.util.concurrent.Executors;
     25 import java.util.concurrent.Future;
     26 import java.util.concurrent.ScheduledExecutorService;
     27 import java.util.concurrent.TimeUnit;
     28 import java.util.concurrent.locks.ReentrantLock;
     29 import java.util.logging.Level;
     30 import java.util.logging.Logger;
     31 
     32 import javax.annotation.concurrent.GuardedBy;
     33 
     34 /**
     35  * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in
     36  * the "running" state need to perform a periodic task.  Subclasses can implement {@link #startUp},
     37  * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically.
     38  *
     39  * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run
     40  * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the
     41  * {@link #runOneIteration} that will be executed periodically as specified by its
     42  * {@link Scheduler}. When this service is asked to stop via {@link #stop} or {@link #stopAndWait},
     43  * it will cancel the periodic task (but not interrupt it) and wait for it to stop before running
     44  * the {@link #shutDown} method.
     45  *
     46  * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link
     47  * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link
     48  * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start
     49  * late.  Also, all life cycle methods are executed with a lock held, so subclasses can safely
     50  * modify shared state without additional synchronization necessary for visibility to later
     51  * executions of the life cycle methods.
     52  *
     53  * <h3>Usage Example</h3>
     54  *
     55  * Here is a sketch of a service which crawls a website and uses the scheduling capabilities to
     56  * rate limit itself. <pre> {@code
     57  * class CrawlingService extends AbstractScheduledService {
     58  *   private Set<Uri> visited;
     59  *   private Queue<Uri> toCrawl;
     60  *   protected void startUp() throws Exception {
     61  *     toCrawl = readStartingUris();
     62  *   }
     63  *
     64  *   protected void runOneIteration() throws Exception {
     65  *     Uri uri = toCrawl.remove();
     66  *     Collection<Uri> newUris = crawl(uri);
     67  *     visited.add(uri);
     68  *     for (Uri newUri : newUris) {
     69  *       if (!visited.contains(newUri)) { toCrawl.add(newUri); }
     70  *     }
     71  *   }
     72  *
     73  *   protected void shutDown() throws Exception {
     74  *     saveUris(toCrawl);
     75  *   }
     76  *
     77  *   protected Scheduler scheduler() {
     78  *     return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS);
     79  *   }
     80  * }}</pre>
     81  *
     82  * This class uses the life cycle methods to read in a list of starting URIs and save the set of
     83  * outstanding URIs when shutting down.  Also, it takes advantage of the scheduling functionality to
     84  * rate limit the number of queries we perform.
     85  *
     86  * @author Luke Sandberg
     87  * @since 11.0
     88  */
     89 @Beta
     90 public abstract class AbstractScheduledService implements Service {
     91   private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName());
     92 
     93   /**
     94    * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its
     95    * task.
     96    *
     97    * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory
     98    * methods, these provide {@link Scheduler} instances for the common use case of running the
     99    * service with a fixed schedule.  If more flexibility is needed then consider subclassing the
    100    * {@link CustomScheduler} abstract class in preference to creating your own {@link Scheduler}
    101    * implementation.
    102    *
    103    * @author Luke Sandberg
    104    * @since 11.0
    105    */
    106   public abstract static class Scheduler {
    107     /**
    108      * Returns a {@link Scheduler} that schedules the task using the
    109      * {@link ScheduledExecutorService#scheduleWithFixedDelay} method.
    110      *
    111      * @param initialDelay the time to delay first execution
    112      * @param delay the delay between the termination of one execution and the commencement of the
    113      *        next
    114      * @param unit the time unit of the initialDelay and delay parameters
    115      */
    116     public static Scheduler newFixedDelaySchedule(final long initialDelay, final long delay,
    117         final TimeUnit unit) {
    118       return new Scheduler() {
    119         @Override
    120         public Future<?> schedule(AbstractService service, ScheduledExecutorService executor,
    121             Runnable task) {
    122           return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit);
    123         }
    124       };
    125     }
    126 
    127     /**
    128      * Returns a {@link Scheduler} that schedules the task using the
    129      * {@link ScheduledExecutorService#scheduleAtFixedRate} method.
    130      *
    131      * @param initialDelay the time to delay first execution
    132      * @param period the period between successive executions of the task
    133      * @param unit the time unit of the initialDelay and period parameters
    134      */
    135     public static Scheduler newFixedRateSchedule(final long initialDelay, final long period,
    136         final TimeUnit unit) {
    137       return new Scheduler() {
    138         @Override
    139         public Future<?> schedule(AbstractService service, ScheduledExecutorService executor,
    140             Runnable task) {
    141           return executor.scheduleAtFixedRate(task, initialDelay, period, unit);
    142         }
    143       };
    144     }
    145 
    146     /** Schedules the task to run on the provided executor on behalf of the service.  */
    147     abstract Future<?> schedule(AbstractService service, ScheduledExecutorService executor,
    148         Runnable runnable);
    149 
    150     private Scheduler() {}
    151   }
    152 
    153   /* use AbstractService for state management */
    154   private final AbstractService delegate = new AbstractService() {
    155 
    156     // A handle to the running task so that we can stop it when a shutdown has been requested.
    157     // These two fields are volatile because their values will be accessed from multiple threads.
    158     private volatile Future<?> runningTask;
    159     private volatile ScheduledExecutorService executorService;
    160 
    161     // This lock protects the task so we can ensure that none of the template methods (startUp,
    162     // shutDown or runOneIteration) run concurrently with one another.
    163     private final ReentrantLock lock = new ReentrantLock();
    164 
    165     private final Runnable task = new Runnable() {
    166       @Override public void run() {
    167         lock.lock();
    168         try {
    169           AbstractScheduledService.this.runOneIteration();
    170         } catch (Throwable t) {
    171           try {
    172             shutDown();
    173           } catch (Exception ignored) {
    174             logger.log(Level.WARNING,
    175                 "Error while attempting to shut down the service after failure.", ignored);
    176           }
    177           notifyFailed(t);
    178           throw Throwables.propagate(t);
    179         } finally {
    180           lock.unlock();
    181         }
    182       }
    183     };
    184 
    185     @Override protected final void doStart() {
    186       executorService = executor();
    187       executorService.execute(new Runnable() {
    188         @Override public void run() {
    189           lock.lock();
    190           try {
    191             startUp();
    192             runningTask = scheduler().schedule(delegate, executorService, task);
    193             notifyStarted();
    194           } catch (Throwable t) {
    195             notifyFailed(t);
    196             throw Throwables.propagate(t);
    197           } finally {
    198             lock.unlock();
    199           }
    200         }
    201       });
    202     }
    203 
    204     @Override protected final void doStop() {
    205       runningTask.cancel(false);
    206       executorService.execute(new Runnable() {
    207         @Override public void run() {
    208           try {
    209             lock.lock();
    210             try {
    211               if (state() != State.STOPPING) {
    212                 // This means that the state has changed since we were scheduled.  This implies that
    213                 // an execution of runOneIteration has thrown an exception and we have transitioned
    214                 // to a failed state, also this means that shutDown has already been called, so we
    215                 // do not want to call it again.
    216                 return;
    217               }
    218               shutDown();
    219             } finally {
    220               lock.unlock();
    221             }
    222             notifyStopped();
    223           } catch (Throwable t) {
    224             notifyFailed(t);
    225             throw Throwables.propagate(t);
    226           }
    227         }
    228       });
    229     }
    230   };
    231 
    232   /**
    233    * Run one iteration of the scheduled task. If any invocation of this method throws an exception,
    234    * the service will transition to the {@link Service.State#FAILED} state and this method will no
    235    * longer be called.
    236    */
    237   protected abstract void runOneIteration() throws Exception;
    238 
    239   /** Start the service. */
    240   protected abstract void startUp() throws Exception;
    241 
    242   /** Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. */
    243   protected abstract void shutDown() throws Exception;
    244 
    245   /**
    246    * Returns the {@link Scheduler} object used to configure this service.  This method will only be
    247    * called once.
    248    */
    249   protected abstract Scheduler scheduler();
    250 
    251   /**
    252    * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp},
    253    * {@link #runOneIteration} and {@link #shutDown} methods.  The executor will not be
    254    * {@link ScheduledExecutorService#shutdown} when this service stops. Subclasses may override this
    255    * method to use a custom {@link ScheduledExecutorService} instance.
    256    *
    257    * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread
    258    * pool.  This method will only be called once.
    259    */
    260   protected ScheduledExecutorService executor() {
    261     return Executors.newSingleThreadScheduledExecutor();
    262   }
    263 
    264   @Override public String toString() {
    265     return getClass().getSimpleName() + " [" + state() + "]";
    266   }
    267 
    268   // We override instead of using ForwardingService so that these can be final.
    269 
    270   @Override public final ListenableFuture<State> start() {
    271     return delegate.start();
    272   }
    273 
    274   @Override public final State startAndWait() {
    275     return delegate.startAndWait();
    276   }
    277 
    278   @Override public final boolean isRunning() {
    279     return delegate.isRunning();
    280   }
    281 
    282   @Override public final State state() {
    283     return delegate.state();
    284   }
    285 
    286   @Override public final ListenableFuture<State> stop() {
    287     return delegate.stop();
    288   }
    289 
    290   @Override public final State stopAndWait() {
    291     return delegate.stopAndWait();
    292   }
    293 
    294   /**
    295    * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to
    296    * use a dynamically changing schedule.  After every execution of the task, assuming it hasn't
    297    * been cancelled, the {@link #getNextSchedule} method will be called.
    298    *
    299    * @author Luke Sandberg
    300    * @since 11.0
    301    */
    302   @Beta
    303   public abstract static class CustomScheduler extends Scheduler {
    304 
    305     /**
    306      * A callable class that can reschedule itself using a {@link CustomScheduler}.
    307      */
    308     private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> {
    309 
    310       /** The underlying task. */
    311       private final Runnable wrappedRunnable;
    312 
    313       /** The executor on which this Callable will be scheduled. */
    314       private final ScheduledExecutorService executor;
    315 
    316       /**
    317        * The service that is managing this callable.  This is used so that failure can be
    318        * reported properly.
    319        */
    320       private final AbstractService service;
    321 
    322       /**
    323        * This lock is used to ensure safe and correct cancellation, it ensures that a new task is
    324        * not scheduled while a cancel is ongoing.  Also it protects the currentFuture variable to
    325        * ensure that it is assigned atomically with being scheduled.
    326        */
    327       private final ReentrantLock lock = new ReentrantLock();
    328 
    329       /** The future that represents the next execution of this task.*/
    330       @GuardedBy("lock")
    331       private Future<Void> currentFuture;
    332 
    333       ReschedulableCallable(AbstractService service, ScheduledExecutorService executor,
    334           Runnable runnable) {
    335         this.wrappedRunnable = runnable;
    336         this.executor = executor;
    337         this.service = service;
    338       }
    339 
    340       @Override
    341       public Void call() throws Exception {
    342         wrappedRunnable.run();
    343         reschedule();
    344         return null;
    345       }
    346 
    347       /**
    348        * Atomically reschedules this task and assigns the new future to {@link #currentFuture}.
    349        */
    350       public void reschedule() {
    351         // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that
    352         // cancel calls cancel on the correct future. 2. we want to make sure that the assignment
    353         // to currentFuture doesn't race with itself so that currentFuture is assigned in the
    354         // correct order.
    355         lock.lock();
    356         try {
    357           if (currentFuture == null || !currentFuture.isCancelled()) {
    358             final Schedule schedule = CustomScheduler.this.getNextSchedule();
    359             currentFuture = executor.schedule(this, schedule.delay, schedule.unit);
    360           }
    361         } catch (Throwable e) {
    362           // If an exception is thrown by the subclass then we need to make sure that the service
    363           // notices and transitions to the FAILED state.  We do it by calling notifyFailed directly
    364           // because the service does not monitor the state of the future so if the exception is not
    365           // caught and forwarded to the service the task would stop executing but the service would
    366           // have no idea.
    367           service.notifyFailed(e);
    368         } finally {
    369           lock.unlock();
    370         }
    371       }
    372 
    373       // N.B. Only protect cancel and isCancelled because those are the only methods that are
    374       // invoked by the AbstractScheduledService.
    375       @Override
    376       public boolean cancel(boolean mayInterruptIfRunning) {
    377         // Ensure that a task cannot be rescheduled while a cancel is ongoing.
    378         lock.lock();
    379         try {
    380           return currentFuture.cancel(mayInterruptIfRunning);
    381         } finally {
    382           lock.unlock();
    383         }
    384       }
    385 
    386       @Override
    387       protected Future<Void> delegate() {
    388         throw new UnsupportedOperationException("Only cancel is supported by this future");
    389       }
    390     }
    391 
    392     @Override
    393     final Future<?> schedule(AbstractService service, ScheduledExecutorService executor,
    394         Runnable runnable) {
    395       ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable);
    396       task.reschedule();
    397       return task;
    398     }
    399 
    400     /**
    401      * A value object that represents an absolute delay until a task should be invoked.
    402      *
    403      * @author Luke Sandberg
    404      * @since 11.0
    405      */
    406     @Beta
    407     protected static final class Schedule {
    408 
    409       private final long delay;
    410       private final TimeUnit unit;
    411 
    412       /**
    413        * @param delay the time from now to delay execution
    414        * @param unit the time unit of the delay parameter
    415        */
    416       public Schedule(long delay, TimeUnit unit) {
    417         this.delay = delay;
    418         this.unit = Preconditions.checkNotNull(unit);
    419       }
    420     }
    421 
    422     /**
    423      * Calculates the time at which to next invoke the task.
    424      *
    425      * <p>This is guaranteed to be called immediately after the task has completed an iteration and
    426      * on the same thread as the previous execution of {@link
    427      * AbstractScheduledService#runOneIteration}.
    428      *
    429      * @return a schedule that defines the delay before the next execution.
    430      */
    431     protected abstract Schedule getNextSchedule() throws Exception;
    432   }
    433 }
    434