Home | History | Annotate | Download | only in internal
      1 /*
      2  * Copyright 2015 The gRPC Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package io.grpc.internal;
     18 
     19 import com.google.common.base.Stopwatch;
     20 import com.google.common.base.Supplier;
     21 import com.google.common.base.Ticker;
     22 import com.google.common.util.concurrent.AbstractFuture;
     23 import java.util.ArrayList;
     24 import java.util.Collection;
     25 import java.util.List;
     26 import java.util.concurrent.Callable;
     27 import java.util.concurrent.Delayed;
     28 import java.util.concurrent.Future;
     29 import java.util.concurrent.PriorityBlockingQueue;
     30 import java.util.concurrent.ScheduledExecutorService;
     31 import java.util.concurrent.ScheduledFuture;
     32 import java.util.concurrent.TimeUnit;
     33 
     34 /**
     35  * A manipulated clock that exports a {@link Ticker} and a {@link ScheduledExecutorService}.
     36  *
     37  * <p>To simulate the locking scenario of using real executors, it never runs tasks within {@code
     38  * schedule()} or {@code execute()}. Instead, you should call {@link #runDueTasks} in your test
     39  * method to run all due tasks. {@link #forwardTime} and {@link #forwardNanos} call {@link
     40  * #runDueTasks} automatically.
     41  */
     42 public final class FakeClock {
     43 
     44   private static final TaskFilter ACCEPT_ALL_FILTER = new TaskFilter() {
     45       @Override
     46       public boolean shouldAccept(Runnable command) {
     47         return true;
     48       }
     49     };
     50 
     51   private final ScheduledExecutorService scheduledExecutorService = new ScheduledExecutorImpl();
     52 
     53   private final PriorityBlockingQueue<ScheduledTask> tasks =
     54       new PriorityBlockingQueue<ScheduledTask>();
     55 
     56   private final Ticker ticker =
     57       new Ticker() {
     58         @Override public long read() {
     59           return currentTimeNanos;
     60         }
     61       };
     62 
     63   private final Supplier<Stopwatch> stopwatchSupplier =
     64       new Supplier<Stopwatch>() {
     65         @Override public Stopwatch get() {
     66           return Stopwatch.createUnstarted(ticker);
     67         }
     68       };
     69 
     70   private long currentTimeNanos;
     71 
     72   public class ScheduledTask extends AbstractFuture<Void> implements ScheduledFuture<Void> {
     73     public final Runnable command;
     74     public final long dueTimeNanos;
     75 
     76     ScheduledTask(long dueTimeNanos, Runnable command) {
     77       this.dueTimeNanos = dueTimeNanos;
     78       this.command = command;
     79     }
     80 
     81     @Override public boolean cancel(boolean mayInterruptIfRunning) {
     82       tasks.remove(this);
     83       return super.cancel(mayInterruptIfRunning);
     84     }
     85 
     86     @Override public long getDelay(TimeUnit unit) {
     87       return unit.convert(dueTimeNanos - currentTimeNanos, TimeUnit.NANOSECONDS);
     88     }
     89 
     90     @Override public int compareTo(Delayed other) {
     91       ScheduledTask otherTask = (ScheduledTask) other;
     92       if (dueTimeNanos > otherTask.dueTimeNanos) {
     93         return 1;
     94       } else if (dueTimeNanos < otherTask.dueTimeNanos) {
     95         return -1;
     96       } else {
     97         return 0;
     98       }
     99     }
    100 
    101     void complete() {
    102       set(null);
    103     }
    104 
    105     @Override
    106     public String toString() {
    107       return "[due=" + dueTimeNanos + ", task=" + command + "]";
    108     }
    109   }
    110 
    111   private class ScheduledExecutorImpl implements ScheduledExecutorService {
    112     @Override public <V> ScheduledFuture<V> schedule(
    113         Callable<V> callable, long delay, TimeUnit unit) {
    114       throw new UnsupportedOperationException();
    115     }
    116 
    117     @Override public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
    118       ScheduledTask task = new ScheduledTask(currentTimeNanos + unit.toNanos(delay), cmd);
    119       tasks.add(task);
    120       return task;
    121     }
    122 
    123     @Override public ScheduledFuture<?> scheduleAtFixedRate(
    124         Runnable command, long initialDelay, long period, TimeUnit unit) {
    125       throw new UnsupportedOperationException();
    126     }
    127 
    128     @Override public ScheduledFuture<?> scheduleWithFixedDelay(
    129         Runnable command, long initialDelay, long delay, TimeUnit unit) {
    130       throw new UnsupportedOperationException();
    131     }
    132 
    133     @Override public boolean awaitTermination(long timeout, TimeUnit unit) {
    134       throw new UnsupportedOperationException();
    135     }
    136 
    137     @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
    138       throw new UnsupportedOperationException();
    139     }
    140 
    141     @Override public <T> List<Future<T>> invokeAll(
    142         Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
    143       throw new UnsupportedOperationException();
    144     }
    145 
    146     @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
    147       throw new UnsupportedOperationException();
    148     }
    149 
    150     @Override public <T> T invokeAny(
    151         Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
    152       throw new UnsupportedOperationException();
    153     }
    154 
    155     @Override public boolean isShutdown() {
    156       throw new UnsupportedOperationException();
    157     }
    158 
    159     @Override public boolean isTerminated() {
    160       throw new UnsupportedOperationException();
    161     }
    162 
    163     @Override public void shutdown() {
    164       throw new UnsupportedOperationException();
    165     }
    166 
    167     @Override public List<Runnable> shutdownNow() {
    168       throw new UnsupportedOperationException();
    169     }
    170 
    171     @Override public <T> Future<T> submit(Callable<T> task) {
    172       throw new UnsupportedOperationException();
    173     }
    174 
    175     @Override public Future<?> submit(Runnable task) {
    176       throw new UnsupportedOperationException();
    177     }
    178 
    179     @Override public <T> Future<T> submit(Runnable task, T result) {
    180       throw new UnsupportedOperationException();
    181     }
    182 
    183     @Override public void execute(Runnable command) {
    184       // Since it is being enqueued immediately, no point in tracing the future for cancellation.
    185       Future<?> unused = schedule(command, 0, TimeUnit.NANOSECONDS);
    186     }
    187   }
    188 
    189   /**
    190    * Provides a partially implemented instance of {@link ScheduledExecutorService} that uses the
    191    * fake clock ticker for testing.
    192    */
    193   public ScheduledExecutorService getScheduledExecutorService() {
    194     return scheduledExecutorService;
    195   }
    196 
    197   /**
    198    * Provides a stopwatch instance that uses the fake clock ticker.
    199    */
    200   public Supplier<Stopwatch> getStopwatchSupplier() {
    201     return stopwatchSupplier;
    202   }
    203 
    204   /**
    205    * Ticker of the FakeClock.
    206    */
    207   public Ticker getTicker() {
    208     return ticker;
    209   }
    210 
    211   /**
    212    * Run all due tasks.
    213    *
    214    * @return the number of tasks run by this call
    215    */
    216   public int runDueTasks() {
    217     int count = 0;
    218     while (true) {
    219       ScheduledTask task = tasks.peek();
    220       if (task == null || task.dueTimeNanos > currentTimeNanos) {
    221         break;
    222       }
    223       if (tasks.remove(task)) {
    224         task.command.run();
    225         task.complete();
    226         count++;
    227       }
    228     }
    229     return count;
    230   }
    231 
    232   /**
    233    * Return all due tasks.
    234    */
    235   public Collection<ScheduledTask> getDueTasks() {
    236     ArrayList<ScheduledTask> result = new ArrayList<>();
    237     for (ScheduledTask task : tasks) {
    238       if (task.dueTimeNanos > currentTimeNanos) {
    239         continue;
    240       }
    241       result.add(task);
    242     }
    243     return result;
    244   }
    245 
    246   /**
    247    * Return all unrun tasks.
    248    */
    249   public Collection<ScheduledTask> getPendingTasks() {
    250     return getPendingTasks(ACCEPT_ALL_FILTER);
    251   }
    252 
    253   /**
    254    * Return all unrun tasks accepted by the given filter.
    255    */
    256   public Collection<ScheduledTask> getPendingTasks(TaskFilter filter) {
    257     ArrayList<ScheduledTask> result = new ArrayList<>();
    258     for (ScheduledTask task : tasks) {
    259       if (filter.shouldAccept(task.command)) {
    260         result.add(task);
    261       }
    262     }
    263     return result;
    264   }
    265 
    266   /**
    267    * Forward the time by the given duration and run all due tasks.
    268    *
    269    * @return the number of tasks run by this call
    270    */
    271   public int forwardTime(long value, TimeUnit unit) {
    272     currentTimeNanos += unit.toNanos(value);
    273     return runDueTasks();
    274   }
    275 
    276   /**
    277    * Forward the time by the given nanoseconds and run all due tasks.
    278    *
    279    * @return the number of tasks run by this call
    280    */
    281   public int forwardNanos(long nanos) {
    282     return forwardTime(nanos, TimeUnit.NANOSECONDS);
    283   }
    284 
    285   /**
    286    * Return the number of queued tasks.
    287    */
    288   public int numPendingTasks() {
    289     return tasks.size();
    290   }
    291 
    292   /**
    293    * Return the number of queued tasks accepted by the given filter.
    294    */
    295   public int numPendingTasks(TaskFilter filter) {
    296     int count = 0;
    297     for (ScheduledTask task : tasks) {
    298       if (filter.shouldAccept(task.command)) {
    299         count++;
    300       }
    301     }
    302     return count;
    303   }
    304 
    305   public long currentTimeMillis() {
    306     // Normally millis and nanos are of different epochs. Add an offset to simulate that.
    307     return TimeUnit.NANOSECONDS.toMillis(currentTimeNanos + 123456789L);
    308   }
    309 
    310   /**
    311    * A filter that allows us to have fine grained control over which tasks are accepted for certain
    312    * operation.
    313    */
    314   public interface TaskFilter {
    315     /**
    316      * Inspect the Runnable and returns true if it should be accepted.
    317      */
    318     boolean shouldAccept(Runnable runnable);
    319   }
    320 }
    321