Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License
     15  */
     16 
     17 package com.android.dialer.common.concurrent;
     18 
     19 import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
     20 import static com.google.common.util.concurrent.Futures.immediateFuture;
     21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
     22 
     23 import com.google.common.util.concurrent.AsyncCallable;
     24 import com.google.common.util.concurrent.Futures;
     25 import com.google.common.util.concurrent.ListenableFuture;
     26 import com.google.common.util.concurrent.SettableFuture;
     27 import java.util.concurrent.Callable;
     28 import java.util.concurrent.Executor;
     29 import java.util.concurrent.atomic.AtomicBoolean;
     30 import java.util.concurrent.atomic.AtomicReference;
     31 
     32 /**
     33  * Serializes execution of a set of operations. This class guarantees that a submitted callable will
     34  * not be called before previously submitted callables have completed.
     35  */
     36 public final class DialerFutureSerializer {
     37   /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */
     38   private final AtomicReference<ListenableFuture<?>> ref =
     39       new AtomicReference<>(immediateFuture(null));
     40 
     41   /** Enqueues a task to run when the previous task (if any) completes. */
     42   public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) {
     43     return submitAsync(() -> immediateFuture(callable.call()), executor);
     44   }
     45 
     46   /**
     47    * Enqueues a task to run when the previous task (if any) completes.
     48    *
     49    * <p>Cancellation does not propagate from the output future to the future returned from {@code
     50    * callable}, but if the output future is cancelled before {@link AsyncCallable#call()} is
     51    * invoked, {@link AsyncCallable#call()} will not be invoked.
     52    */
     53   public <T> ListenableFuture<T> submitAsync(final AsyncCallable<T> callable, Executor executor) {
     54     AtomicBoolean wasCancelled = new AtomicBoolean(false);
     55     final AsyncCallable<T> task =
     56         () -> {
     57           if (wasCancelled.get()) {
     58             return immediateCancelledFuture();
     59           }
     60           return callable.call();
     61         };
     62     /*
     63      * Three futures are at play here:
     64      * taskFuture is the future that comes from the callable.
     65      * newFuture is the future we use to track the serialization of our task.
     66      * oldFuture is the previous task's newFuture.
     67      *
     68      * newFuture is guaranteed to only complete once all tasks previously submitted to this instance
     69      * once the futures returned from those submissions have completed.
     70      */
     71     final SettableFuture<Object> newFuture = SettableFuture.create();
     72 
     73     final ListenableFuture<?> oldFuture = ref.getAndSet(newFuture);
     74 
     75     // Invoke our task once the previous future completes.
     76     final ListenableFuture<T> taskFuture =
     77         Futures.nonCancellationPropagating(
     78             Futures.submitAsync(task, runnable -> oldFuture.addListener(runnable, executor)));
     79     // newFuture's lifetime is determined by taskFuture, unless taskFuture is cancelled, in which
     80     // case it falls back to oldFuture's. This is to ensure that if the future we return is
     81     // cancelled, we don't begin execution of the next task until after oldFuture completes.
     82     taskFuture.addListener(
     83         () -> {
     84           if (taskFuture.isCancelled()) {
     85             // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of a
     86             // future that eventually came from immediateFuture(null), this doesn't leak throwables
     87             // or completion values.
     88             wasCancelled.set(true);
     89             newFuture.setFuture(oldFuture);
     90           } else {
     91             newFuture.set(null);
     92           }
     93         },
     94         directExecutor());
     95 
     96     return taskFuture;
     97   }
     98 }
     99