Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License
     15  */
     16 
     17 package com.android.dialer.common.concurrent;
     18 
     19 import com.google.common.base.Predicate;
     20 import com.google.common.collect.ImmutableList;
     21 import com.google.common.util.concurrent.AbstractFuture;
     22 import com.google.common.util.concurrent.Atomics;
     23 import com.google.common.util.concurrent.Futures;
     24 import com.google.common.util.concurrent.ListenableFuture;
     25 import com.google.common.util.concurrent.MoreExecutors;
     26 import java.util.concurrent.ExecutionException;
     27 import java.util.concurrent.atomic.AtomicInteger;
     28 import java.util.concurrent.atomic.AtomicReference;
     29 
     30 /** Static utility methods related to futures. */
     31 public class DialerFutures {
     32 
     33   /**
     34    * Returns a future that will complete with the same value as the first matching the supplied
     35    * predicate, cancelling all inputs upon completion. If none match, {@code defaultValue} is
     36    * returned.
     37    *
     38    * <p>If an input fails before a match is found, the returned future also fails.
     39    *
     40    * <p>Cancellation of the output future will cause cancellation of all input futures.
     41    *
     42    * @throws IllegalArgumentException if {@code futures} is empty.
     43    */
     44   public static <T> ListenableFuture<T> firstMatching(
     45       Iterable<? extends ListenableFuture<? extends T>> futures,
     46       Predicate<T> predicate,
     47       T defaultValue) {
     48     return firstMatchingImpl(futures, predicate, defaultValue);
     49   }
     50 
     51   private static <T> ListenableFuture<T> firstMatchingImpl(
     52       Iterable<? extends ListenableFuture<? extends T>> futures,
     53       Predicate<T> predicate,
     54       T defaultValue) {
     55     AggregateFuture<T> output = new AnyOfFuture<>(futures);
     56     final AtomicReference<AggregateFuture<T>> ref = Atomics.newReference(output);
     57     final AtomicInteger pending = new AtomicInteger(output.futures.size());
     58     for (final ListenableFuture<? extends T> future : output.futures) {
     59       future.addListener(
     60           new Runnable() {
     61             @Override
     62             public void run() {
     63               // Call get() and then set() instead of getAndSet() because a volatile read/write is
     64               // cheaper than a CAS and atomicity is guaranteed by setFuture.
     65               AggregateFuture<T> output = ref.get();
     66               if (output != null) {
     67                 T value = null;
     68                 try {
     69                   value = Futures.getDone(future);
     70                 } catch (ExecutionException e) {
     71                   ref.set(null); // unpin
     72                   output.setException(e);
     73                   return;
     74                 }
     75                 if (!predicate.apply(value)) {
     76                   if (pending.decrementAndGet() == 0) {
     77                     // we are the last future (and every other future hasn't matched or failed).
     78                     output.set(defaultValue);
     79                     // no point in clearing the ref, every other listener has already run
     80                   }
     81                 } else {
     82                   ref.set(null); // unpin
     83                   output.set(value);
     84                 }
     85               }
     86             }
     87           },
     88           MoreExecutors.directExecutor());
     89     }
     90     return output;
     91   }
     92 
     93   private static class AggregateFuture<T> extends AbstractFuture<T> {
     94     ImmutableList<ListenableFuture<? extends T>> futures;
     95 
     96     AggregateFuture(Iterable<? extends ListenableFuture<? extends T>> futures) {
     97       ImmutableList<ListenableFuture<? extends T>> futuresCopy = ImmutableList.copyOf(futures);
     98       if (futuresCopy.isEmpty()) {
     99         throw new IllegalArgumentException("Expected at least one future, got 0.");
    100       }
    101       this.futures = futuresCopy;
    102     }
    103 
    104     // increase visibility
    105     @Override
    106     protected boolean set(T t) {
    107       return super.set(t);
    108     }
    109 
    110     @Override
    111     protected boolean setException(Throwable throwable) {
    112       return super.setException(throwable);
    113     }
    114 
    115     @Override
    116     protected boolean setFuture(ListenableFuture<? extends T> t) {
    117       return super.setFuture(t);
    118     }
    119   }
    120 
    121   // Propagates cancellation to all inputs cancels all inputs upon completion
    122   private static final class AnyOfFuture<T> extends AggregateFuture<T> {
    123     AnyOfFuture(Iterable<? extends ListenableFuture<? extends T>> futures) {
    124       super(futures);
    125     }
    126 
    127     @SuppressWarnings("ShortCircuitBoolean")
    128     @Override
    129     protected void afterDone() {
    130       ImmutableList<ListenableFuture<? extends T>> localFutures = futures;
    131       futures = null; // unpin
    132       // even though afterDone is only called once, it is possible that the 'futures' field is null
    133       // because it isn't final and thus the write might not be visible if the future instance was
    134       // unsafely published.  See the comment at the top of Futures.java on memory visibility.
    135       if (localFutures != null) {
    136         boolean interrupt = !isCancelled() | wasInterrupted();
    137         for (ListenableFuture<? extends T> future : localFutures) {
    138           future.cancel(interrupt);
    139         }
    140       }
    141     }
    142   }
    143 }
    144