Home | History | Annotate | Download | only in runner
      1 /*
      2  * Copyright (C) 2011 Google Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
      5  * in compliance with the License. You may obtain a copy of the License at
      6  *
      7  * http://www.apache.org/licenses/LICENSE-2.0
      8  *
      9  * Unless required by applicable law or agreed to in writing, software distributed under the License
     10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
     11  * or implied. See the License for the specific language governing permissions and limitations under
     12  * the License.
     13  */
     14 
     15 package com.google.caliper.runner;
     16 
     17 import static java.util.concurrent.TimeUnit.MILLISECONDS;
     18 import static java.util.concurrent.TimeUnit.NANOSECONDS;
     19 
     20 import com.google.caliper.bridge.LogMessage;
     21 import com.google.caliper.bridge.ShouldContinueMessage;
     22 import com.google.caliper.bridge.StopMeasurementLogMessage;
     23 import com.google.caliper.model.Trial;
     24 import com.google.caliper.options.CaliperOptions;
     25 import com.google.caliper.runner.Instrument.MeasurementCollectingVisitor;
     26 import com.google.caliper.runner.StreamService.StreamItem;
     27 import com.google.caliper.util.ShortDuration;
     28 import com.google.common.base.Stopwatch;
     29 import com.google.common.base.Throwables;
     30 import com.google.common.util.concurrent.Service.State;
     31 
     32 import org.joda.time.Duration;
     33 
     34 import java.io.IOException;
     35 import java.util.concurrent.Callable;
     36 import java.util.logging.Level;
     37 import java.util.logging.Logger;
     38 
     39 import javax.inject.Inject;
     40 
     41 /**
     42  * The main data gather control loop for a Trial.
     43  *
     44  * <p>This class starts the worker process, reads all the data from it and constructs the
     45  * {@link Trial} while enforcing the trial timeout.
     46  */
     47 @TrialScoped class TrialRunLoop implements Callable<TrialResult> {
     48   private static final Logger logger = Logger.getLogger(TrialRunLoop.class.getName());
     49 
     50   /** The time that the worker has to clean up after an experiment. */
     51   private static final Duration WORKER_CLEANUP_DURATION = Duration.standardSeconds(2);
     52 
     53   private final CaliperOptions options;
     54   private final StreamService streamService;
     55   private final TrialResultFactory trialFactory;
     56 
     57   // TODO(lukes): The VmDataCollectingVisitor should be able to tell us when it has collected all
     58   // its data.
     59   private final VmDataCollectingVisitor dataCollectingVisitor;
     60   private final Stopwatch trialStopwatch = Stopwatch.createUnstarted();
     61   private final MeasurementCollectingVisitor measurementCollectingVisitor;
     62   private final TrialOutputLogger trialOutput;
     63 
     64   @Inject TrialRunLoop(
     65       MeasurementCollectingVisitor measurementCollectingVisitor,
     66       CaliperOptions options,
     67       TrialResultFactory trialFactory,
     68       TrialOutputLogger trialOutput,
     69       StreamService streamService,
     70       VmDataCollectingVisitor dataCollectingVisitor) {
     71     this.options = options;
     72     this.trialFactory = trialFactory;
     73     this.streamService = streamService;
     74     this.measurementCollectingVisitor = measurementCollectingVisitor;
     75     this.trialOutput = trialOutput;
     76     this.dataCollectingVisitor = dataCollectingVisitor;
     77   }
     78 
     79   @Override public TrialResult call() throws TrialFailureException, IOException {
     80     if (streamService.state() != State.NEW) {
     81       throw new IllegalStateException("You can only invoke the run loop once");
     82     }
     83     trialOutput.open();
     84     trialOutput.printHeader();
     85     streamService.startAsync().awaitRunning();
     86     try {
     87       long timeLimitNanos = getTrialTimeLimitTrialNanos();
     88       boolean doneCollecting = false;
     89       boolean done = false;
     90       while (!done) {
     91         StreamItem item;
     92         try {
     93           item = streamService.readItem(
     94               timeLimitNanos - trialStopwatch.elapsed(NANOSECONDS),
     95               NANOSECONDS);
     96         } catch (InterruptedException e) {
     97           trialOutput.ensureFileIsSaved();
     98           // Someone has asked us to stop (via Futures.cancel?).
     99           if (doneCollecting) {
    100             logger.log(Level.WARNING, "Trial cancelled before completing normally (but after "
    101                 + "collecting sufficient data). Inspect {0} to see any worker output",
    102                 trialOutput.trialOutputFile());
    103             done = true;
    104             break;
    105           }
    106           // We were asked to stop but we didn't actually finish (the normal case).  Fail the trial.
    107           throw new TrialFailureException(
    108               String.format("Trial cancelled.  Inspect %s to see any worker output.",
    109                 trialOutput.trialOutputFile()));
    110         }
    111         switch (item.kind()) {
    112           case DATA:
    113             LogMessage logMessage = item.content();
    114             logMessage.accept(measurementCollectingVisitor);
    115             logMessage.accept(dataCollectingVisitor);
    116             if (!doneCollecting && measurementCollectingVisitor.isDoneCollecting()) {
    117               doneCollecting = true;
    118               // We have received all the measurements we need and are about to tell the worker to
    119               // shut down.  At this point the worker should shutdown soon, but we don't want to
    120               // wait too long, so decrease the time limit so that we wait no more than
    121               // WORKER_CLEANUP_DURATION.
    122               long cleanupTimeNanos = MILLISECONDS.toNanos(WORKER_CLEANUP_DURATION.getMillis());
    123               // TODO(lukes): Does the min operation make sense here? should we just use the
    124               // cleanupTimeNanos?
    125               timeLimitNanos = trialStopwatch.elapsed(NANOSECONDS) + cleanupTimeNanos;
    126             }
    127             // If it is a stop measurement message we need to tell the worker to either stop or keep
    128             // going with a WorkerContinueMessage.  This needs to be done after the
    129             // measurementCollecting visitor sees the message so that isDoneCollection will be up to
    130             // date.
    131             if (logMessage instanceof StopMeasurementLogMessage) {
    132               // TODO(lukes): this is a blocking write, perhaps we should perform it in a non
    133               // blocking manner to keep this thread only blocking in one place.  This would
    134               // complicate error handling, but may increase performance since it would free this
    135               // thread up to handle other messages
    136               streamService.sendMessage(
    137                   new ShouldContinueMessage(
    138                       !doneCollecting,
    139                       measurementCollectingVisitor.isWarmupComplete()));
    140               if (doneCollecting) {
    141                 streamService.closeWriter();
    142               }
    143             }
    144             break;
    145           case EOF:
    146             // We consider EOF to be synonymous with worker shutdown
    147             if (!doneCollecting) {
    148               trialOutput.ensureFileIsSaved();
    149               throw new TrialFailureException(String.format("The worker exited without producing "
    150                   + "data. It has likely crashed. Inspect %s to see any worker output.",
    151                   trialOutput.trialOutputFile()));
    152             }
    153             done = true;
    154             break;
    155           case TIMEOUT:
    156             trialOutput.ensureFileIsSaved();
    157             if (doneCollecting) {
    158               // Should this be an error?
    159               logger.log(Level.WARNING, "Worker failed to exit cleanly within the alloted time. "
    160                   + "Inspect {0} to see any worker output", trialOutput.trialOutputFile());
    161               done = true;
    162             } else {
    163               throw new TrialFailureException(String.format(
    164                   "Trial exceeded the total allowable runtime (%s). "
    165                       + "The limit may be adjusted using the --time-limit flag.  Inspect %s to "
    166                       + "see any worker output",
    167                       options.timeLimit(), trialOutput.trialOutputFile()));
    168             }
    169             break;
    170           default:
    171             throw new AssertionError("Impossible item: " + item);
    172         }
    173       }
    174       return trialFactory.newTrialResult(dataCollectingVisitor, measurementCollectingVisitor);
    175     } catch (Throwable e) {
    176       Throwables.propagateIfInstanceOf(e, TrialFailureException.class);
    177       // This is some failure that is not a TrialFailureException, let the exception propagate but
    178       // log the filename for the user.
    179       trialOutput.ensureFileIsSaved();
    180       logger.severe(
    181           String.format(
    182               "Unexpected error while executing trial. Inspect %s to see any worker output.",
    183               trialOutput.trialOutputFile()));
    184       throw Throwables.propagate(e);
    185     } finally {
    186       trialStopwatch.reset();
    187       streamService.stopAsync();
    188       trialOutput.close();
    189     }
    190   }
    191 
    192   private long getTrialTimeLimitTrialNanos() {
    193     ShortDuration timeLimit = options.timeLimit();
    194     if (ShortDuration.zero().equals(timeLimit)) {
    195       return Long.MAX_VALUE;
    196     }
    197     return timeLimit.to(NANOSECONDS);
    198   }
    199 }
    200