1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License 15 */ 16 17 package com.android.dialer.common.concurrent; 18 19 import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; 20 import static com.google.common.util.concurrent.Futures.immediateFuture; 21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 22 23 import com.google.common.util.concurrent.AsyncCallable; 24 import com.google.common.util.concurrent.Futures; 25 import com.google.common.util.concurrent.ListenableFuture; 26 import com.google.common.util.concurrent.SettableFuture; 27 import java.util.concurrent.Callable; 28 import java.util.concurrent.Executor; 29 import java.util.concurrent.atomic.AtomicBoolean; 30 import java.util.concurrent.atomic.AtomicReference; 31 32 /** 33 * Serializes execution of a set of operations. This class guarantees that a submitted callable will 34 * not be called before previously submitted callables have completed. 35 */ 36 public final class DialerFutureSerializer { 37 /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */ 38 private final AtomicReference<ListenableFuture<?>> ref = 39 new AtomicReference<>(immediateFuture(null)); 40 41 /** Enqueues a task to run when the previous task (if any) completes. */ 42 public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) { 43 return submitAsync(() -> immediateFuture(callable.call()), executor); 44 } 45 46 /** 47 * Enqueues a task to run when the previous task (if any) completes. 48 * 49 * <p>Cancellation does not propagate from the output future to the future returned from {@code 50 * callable}, but if the output future is cancelled before {@link AsyncCallable#call()} is 51 * invoked, {@link AsyncCallable#call()} will not be invoked. 52 */ 53 public <T> ListenableFuture<T> submitAsync(final AsyncCallable<T> callable, Executor executor) { 54 AtomicBoolean wasCancelled = new AtomicBoolean(false); 55 final AsyncCallable<T> task = 56 () -> { 57 if (wasCancelled.get()) { 58 return immediateCancelledFuture(); 59 } 60 return callable.call(); 61 }; 62 /* 63 * Three futures are at play here: 64 * taskFuture is the future that comes from the callable. 65 * newFuture is the future we use to track the serialization of our task. 66 * oldFuture is the previous task's newFuture. 67 * 68 * newFuture is guaranteed to only complete once all tasks previously submitted to this instance 69 * once the futures returned from those submissions have completed. 70 */ 71 final SettableFuture<Object> newFuture = SettableFuture.create(); 72 73 final ListenableFuture<?> oldFuture = ref.getAndSet(newFuture); 74 75 // Invoke our task once the previous future completes. 76 final ListenableFuture<T> taskFuture = 77 Futures.nonCancellationPropagating( 78 Futures.submitAsync(task, runnable -> oldFuture.addListener(runnable, executor))); 79 // newFuture's lifetime is determined by taskFuture, unless taskFuture is cancelled, in which 80 // case it falls back to oldFuture's. This is to ensure that if the future we return is 81 // cancelled, we don't begin execution of the next task until after oldFuture completes. 82 taskFuture.addListener( 83 () -> { 84 if (taskFuture.isCancelled()) { 85 // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of a 86 // future that eventually came from immediateFuture(null), this doesn't leak throwables 87 // or completion values. 88 wasCancelled.set(true); 89 newFuture.setFuture(oldFuture); 90 } else { 91 newFuture.set(null); 92 } 93 }, 94 directExecutor()); 95 96 return taskFuture; 97 } 98 } 99