1 /* 2 * Copyright (C) 2011 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.caliper.runner; 16 17 import static java.util.concurrent.TimeUnit.MILLISECONDS; 18 import static java.util.concurrent.TimeUnit.NANOSECONDS; 19 20 import com.google.caliper.bridge.LogMessage; 21 import com.google.caliper.bridge.ShouldContinueMessage; 22 import com.google.caliper.bridge.StopMeasurementLogMessage; 23 import com.google.caliper.model.Trial; 24 import com.google.caliper.options.CaliperOptions; 25 import com.google.caliper.runner.Instrument.MeasurementCollectingVisitor; 26 import com.google.caliper.runner.StreamService.StreamItem; 27 import com.google.caliper.util.ShortDuration; 28 import com.google.common.base.Stopwatch; 29 import com.google.common.base.Throwables; 30 import com.google.common.util.concurrent.Service.State; 31 32 import org.joda.time.Duration; 33 34 import java.io.IOException; 35 import java.util.concurrent.Callable; 36 import java.util.logging.Level; 37 import java.util.logging.Logger; 38 39 import javax.inject.Inject; 40 41 /** 42 * The main data gather control loop for a Trial. 43 * 44 * <p>This class starts the worker process, reads all the data from it and constructs the 45 * {@link Trial} while enforcing the trial timeout. 46 */ 47 @TrialScoped class TrialRunLoop implements Callable<TrialResult> { 48 private static final Logger logger = Logger.getLogger(TrialRunLoop.class.getName()); 49 50 /** The time that the worker has to clean up after an experiment. */ 51 private static final Duration WORKER_CLEANUP_DURATION = Duration.standardSeconds(2); 52 53 private final CaliperOptions options; 54 private final StreamService streamService; 55 private final TrialResultFactory trialFactory; 56 57 // TODO(lukes): The VmDataCollectingVisitor should be able to tell us when it has collected all 58 // its data. 59 private final VmDataCollectingVisitor dataCollectingVisitor; 60 private final Stopwatch trialStopwatch = Stopwatch.createUnstarted(); 61 private final MeasurementCollectingVisitor measurementCollectingVisitor; 62 private final TrialOutputLogger trialOutput; 63 64 @Inject TrialRunLoop( 65 MeasurementCollectingVisitor measurementCollectingVisitor, 66 CaliperOptions options, 67 TrialResultFactory trialFactory, 68 TrialOutputLogger trialOutput, 69 StreamService streamService, 70 VmDataCollectingVisitor dataCollectingVisitor) { 71 this.options = options; 72 this.trialFactory = trialFactory; 73 this.streamService = streamService; 74 this.measurementCollectingVisitor = measurementCollectingVisitor; 75 this.trialOutput = trialOutput; 76 this.dataCollectingVisitor = dataCollectingVisitor; 77 } 78 79 @Override public TrialResult call() throws TrialFailureException, IOException { 80 if (streamService.state() != State.NEW) { 81 throw new IllegalStateException("You can only invoke the run loop once"); 82 } 83 trialOutput.open(); 84 trialOutput.printHeader(); 85 streamService.startAsync().awaitRunning(); 86 try { 87 long timeLimitNanos = getTrialTimeLimitTrialNanos(); 88 boolean doneCollecting = false; 89 boolean done = false; 90 while (!done) { 91 StreamItem item; 92 try { 93 item = streamService.readItem( 94 timeLimitNanos - trialStopwatch.elapsed(NANOSECONDS), 95 NANOSECONDS); 96 } catch (InterruptedException e) { 97 trialOutput.ensureFileIsSaved(); 98 // Someone has asked us to stop (via Futures.cancel?). 99 if (doneCollecting) { 100 logger.log(Level.WARNING, "Trial cancelled before completing normally (but after " 101 + "collecting sufficient data). Inspect {0} to see any worker output", 102 trialOutput.trialOutputFile()); 103 done = true; 104 break; 105 } 106 // We were asked to stop but we didn't actually finish (the normal case). Fail the trial. 107 throw new TrialFailureException( 108 String.format("Trial cancelled. Inspect %s to see any worker output.", 109 trialOutput.trialOutputFile())); 110 } 111 switch (item.kind()) { 112 case DATA: 113 LogMessage logMessage = item.content(); 114 logMessage.accept(measurementCollectingVisitor); 115 logMessage.accept(dataCollectingVisitor); 116 if (!doneCollecting && measurementCollectingVisitor.isDoneCollecting()) { 117 doneCollecting = true; 118 // We have received all the measurements we need and are about to tell the worker to 119 // shut down. At this point the worker should shutdown soon, but we don't want to 120 // wait too long, so decrease the time limit so that we wait no more than 121 // WORKER_CLEANUP_DURATION. 122 long cleanupTimeNanos = MILLISECONDS.toNanos(WORKER_CLEANUP_DURATION.getMillis()); 123 // TODO(lukes): Does the min operation make sense here? should we just use the 124 // cleanupTimeNanos? 125 timeLimitNanos = trialStopwatch.elapsed(NANOSECONDS) + cleanupTimeNanos; 126 } 127 // If it is a stop measurement message we need to tell the worker to either stop or keep 128 // going with a WorkerContinueMessage. This needs to be done after the 129 // measurementCollecting visitor sees the message so that isDoneCollection will be up to 130 // date. 131 if (logMessage instanceof StopMeasurementLogMessage) { 132 // TODO(lukes): this is a blocking write, perhaps we should perform it in a non 133 // blocking manner to keep this thread only blocking in one place. This would 134 // complicate error handling, but may increase performance since it would free this 135 // thread up to handle other messages 136 streamService.sendMessage( 137 new ShouldContinueMessage( 138 !doneCollecting, 139 measurementCollectingVisitor.isWarmupComplete())); 140 if (doneCollecting) { 141 streamService.closeWriter(); 142 } 143 } 144 break; 145 case EOF: 146 // We consider EOF to be synonymous with worker shutdown 147 if (!doneCollecting) { 148 trialOutput.ensureFileIsSaved(); 149 throw new TrialFailureException(String.format("The worker exited without producing " 150 + "data. It has likely crashed. Inspect %s to see any worker output.", 151 trialOutput.trialOutputFile())); 152 } 153 done = true; 154 break; 155 case TIMEOUT: 156 trialOutput.ensureFileIsSaved(); 157 if (doneCollecting) { 158 // Should this be an error? 159 logger.log(Level.WARNING, "Worker failed to exit cleanly within the alloted time. " 160 + "Inspect {0} to see any worker output", trialOutput.trialOutputFile()); 161 done = true; 162 } else { 163 throw new TrialFailureException(String.format( 164 "Trial exceeded the total allowable runtime (%s). " 165 + "The limit may be adjusted using the --time-limit flag. Inspect %s to " 166 + "see any worker output", 167 options.timeLimit(), trialOutput.trialOutputFile())); 168 } 169 break; 170 default: 171 throw new AssertionError("Impossible item: " + item); 172 } 173 } 174 return trialFactory.newTrialResult(dataCollectingVisitor, measurementCollectingVisitor); 175 } catch (Throwable e) { 176 Throwables.propagateIfInstanceOf(e, TrialFailureException.class); 177 // This is some failure that is not a TrialFailureException, let the exception propagate but 178 // log the filename for the user. 179 trialOutput.ensureFileIsSaved(); 180 logger.severe( 181 String.format( 182 "Unexpected error while executing trial. Inspect %s to see any worker output.", 183 trialOutput.trialOutputFile())); 184 throw Throwables.propagate(e); 185 } finally { 186 trialStopwatch.reset(); 187 streamService.stopAsync(); 188 trialOutput.close(); 189 } 190 } 191 192 private long getTrialTimeLimitTrialNanos() { 193 ShortDuration timeLimit = options.timeLimit(); 194 if (ShortDuration.zero().equals(timeLimit)) { 195 return Long.MAX_VALUE; 196 } 197 return timeLimit.to(NANOSECONDS); 198 } 199 } 200