Home | History | Annotate | Download | only in runner
      1 /*
      2  * Copyright (C) 2013 Google Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
      5  * in compliance with the License. You may obtain a copy of the License at
      6  *
      7  * http://www.apache.org/licenses/LICENSE-2.0
      8  *
      9  * Unless required by applicable law or agreed to in writing, software distributed under the License
     10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
     11  * or implied. See the License for the specific language governing permissions and limitations under
     12  * the License.
     13  */
     14 
     15 package com.google.caliper.runner;
     16 
     17 import static com.google.common.base.Preconditions.checkNotNull;
     18 import static com.google.common.base.Preconditions.checkState;
     19 
     20 import com.google.caliper.bridge.LogMessage;
     21 import com.google.caliper.bridge.OpenedSocket;
     22 import com.google.caliper.bridge.StopMeasurementLogMessage;
     23 import com.google.caliper.model.Measurement;
     24 import com.google.caliper.runner.StreamService.StreamItem.Kind;
     25 import com.google.caliper.util.Parser;
     26 import com.google.common.base.MoreObjects;
     27 import com.google.common.base.MoreObjects.ToStringHelper;
     28 import com.google.common.collect.Queues;
     29 import com.google.common.io.Closeables;
     30 import com.google.common.io.LineReader;
     31 import com.google.common.util.concurrent.AbstractService;
     32 import com.google.common.util.concurrent.ListenableFuture;
     33 import com.google.common.util.concurrent.ListeningExecutorService;
     34 import com.google.common.util.concurrent.MoreExecutors;
     35 import com.google.common.util.concurrent.Service; // for javadoc
     36 import com.google.common.util.concurrent.Service.State; // for javadoc
     37 import com.google.common.util.concurrent.ThreadFactoryBuilder;
     38 import com.google.common.util.concurrent.Uninterruptibles;
     39 
     40 import java.io.IOException;
     41 import java.io.InputStreamReader;
     42 import java.io.Reader;
     43 import java.io.Serializable;
     44 import java.nio.charset.Charset;
     45 import java.text.ParseException;
     46 import java.util.concurrent.BlockingQueue;
     47 import java.util.concurrent.Callable;
     48 import java.util.concurrent.ExecutionException;
     49 import java.util.concurrent.Executors;
     50 import java.util.concurrent.TimeUnit;
     51 import java.util.concurrent.atomic.AtomicInteger;
     52 import java.util.logging.Logger;
     53 
     54 import javax.annotation.Nullable;
     55 import javax.inject.Inject;
     56 
     57 /**
     58  * A {@link Service} that establishes a connection over a socket to a process and then allows
     59  * multiplexed access to the processes' line oriented output over the socket and the standard
     60  * process streams (stdout and stderr) as well as allowing data to be written over the socket.
     61  *
     62  * <p>The {@linkplain State states} of this service are as follows:
     63  * <ul>
     64  *   <li>{@linkplain State#NEW NEW} : Idle state, no reading or writing is allowed.
     65  *   <li>{@linkplain State#STARTING STARTING} : Streams are being opened
     66  *   <li>{@linkplain State#RUNNING RUNNING} : At least one stream is still open or the writer has
     67  *       not been closed yet.
     68  *   <li>{@linkplain State#STOPPING STOPPING} : All streams have closed but some threads may still
     69  *       be running.
     70  *   <li>{@linkplain State#TERMINATED TERMINATED} : Idle state, all streams are closed
     71  *   <li>{@linkplain State#FAILED FAILED} : The service will transition to failed if it encounters
     72  *       any errors while reading from or writing to the streams, service failure will also cause
     73  *       the worker process to be forcibly shutdown and {@link #readItem(long, TimeUnit)},
     74  *       {@link #closeWriter()} and {@link #sendMessage(Serializable)} will start throwing
     75  *       IllegalStateExceptions.
     76  * </ul>
     77  */
     78 @TrialScoped final class StreamService extends AbstractService {
     79   /** How long to wait for a process that should be exiting to actually exit. */
     80   private static final int SHUTDOWN_WAIT_MILLIS = 10;
     81 
     82   private static final Logger logger = Logger.getLogger(StreamService.class.getName());
     83   private static final StreamItem TIMEOUT_ITEM = new StreamItem(Kind.TIMEOUT, null);
     84 
     85   /** The final item that will be sent down the stream. */
     86   static final StreamItem EOF_ITEM = new StreamItem(Kind.EOF, null);
     87 
     88   private final ListeningExecutorService streamExecutor = MoreExecutors.listeningDecorator(
     89       Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build()));
     90   private final BlockingQueue<StreamItem> outputQueue = Queues.newLinkedBlockingQueue();
     91   private final WorkerProcess worker;
     92   private volatile Process process;
     93   private final Parser<LogMessage> logMessageParser;
     94   private final TrialOutputLogger trialOutput;
     95 
     96   /**
     97    * This represents the number of open streams from the users perspective.  i.e. can you still
     98    * write to the socket and read items.
     99    *
    100    * <p>This is decremented when either the socket is closed for writing or the EOF_ITEM has been
    101    * read by the user.
    102    */
    103   private final AtomicInteger openStreams = new AtomicInteger();
    104 
    105   /**
    106    * Used to track how many read streams are open so we can correctly set the EOF_ITEM onto the
    107    * queue.
    108    */
    109   private final AtomicInteger runningReadStreams = new AtomicInteger();
    110   private OpenedSocket.Writer socketWriter;
    111 
    112   @Inject StreamService(WorkerProcess worker,
    113       Parser<LogMessage> logMessageParser,
    114       TrialOutputLogger trialOutput) {
    115     this.worker = worker;
    116     this.logMessageParser = logMessageParser;
    117     this.trialOutput = trialOutput;
    118   }
    119 
    120   @Override protected void doStart() {
    121     try {
    122       // TODO(lukes): write the commandline to the trial output file?
    123       process = worker.startWorker();
    124     } catch (IOException e) {
    125       notifyFailed(e);
    126       return;
    127     }
    128     // Failsafe kill the process and the executor service.
    129     // If the process has already exited cleanly, this will be a no-op.
    130     addListener(new Listener() {
    131       @Override public void starting() {}
    132       @Override public void running() {}
    133       @Override public void stopping(State from) {}
    134       @Override public void terminated(State from) {
    135         cleanup();
    136       }
    137       @Override public void failed(State from, Throwable failure) {
    138         cleanup();
    139       }
    140 
    141       void cleanup() {
    142         streamExecutor.shutdown();
    143         process.destroy();
    144         try {
    145           streamExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
    146         } catch (InterruptedException e) {
    147           Thread.currentThread().interrupt();
    148         }
    149         streamExecutor.shutdownNow();
    150       }
    151     }, MoreExecutors.directExecutor());
    152     // You may be thinking as you read this "Yo dawg, what if IOExceptions rain from the sky?"
    153     // If a stream we are reading from throws an IOException then we fail the entire Service. This
    154     // will cause the worker to be killed (if its not dead already) and the various StreamReaders to
    155     // be interrupted (eventually).
    156 
    157     // use the default charset because worker streams will use the default for output
    158     Charset processCharset = Charset.defaultCharset();
    159     runningReadStreams.addAndGet(2);
    160     openStreams.addAndGet(1);
    161     streamExecutor.submit(
    162         threadRenaming("worker-stderr",
    163             new StreamReader("stderr",
    164                 new InputStreamReader(process.getErrorStream(), processCharset))));
    165     streamExecutor.submit(
    166         threadRenaming("worker-stdout",
    167             new StreamReader("stdout",
    168                 new InputStreamReader(process.getInputStream(), processCharset))));
    169     worker.socketFuture().addListener(
    170         new Runnable() {
    171           @Override public void run() {
    172             try {
    173               OpenedSocket openedSocket =
    174                   Uninterruptibles.getUninterruptibly(worker.socketFuture());
    175               logger.fine("successfully opened the pipe from the worker");
    176               socketWriter = openedSocket.writer();
    177               runningReadStreams.addAndGet(1);
    178               openStreams.addAndGet(1);
    179               streamExecutor.submit(threadRenaming("worker-socket",
    180                   new SocketStreamReader(openedSocket.reader())));
    181             } catch (ExecutionException e) {
    182               notifyFailed(e.getCause());
    183             }
    184           }
    185         },
    186         MoreExecutors.directExecutor());
    187     notifyStarted();
    188   }
    189 
    190   /**
    191    * Reads a {@link StreamItem} from one of the streams waiting for one to become available if
    192    * necessary.
    193    */
    194   StreamItem readItem(long timeout, TimeUnit unit) throws InterruptedException {
    195     checkState(isRunning(), "Cannot read items from a %s StreamService", state());
    196     StreamItem line = outputQueue.poll(timeout, unit);
    197     if (line == EOF_ITEM) {
    198       closeStream();
    199     }
    200     return (line == null) ? TIMEOUT_ITEM : line;
    201   }
    202 
    203   /**
    204    * Write a line of data to the worker process over the socket.
    205    *
    206    * <p>N.B. Writing data via {@link #sendMessage(Serializable)} is only valid once the underlying
    207    * socket has been opened.  This should be fine assuming that socket writes are only in response
    208    * to socket reads (which is currently the case), so there is no way that a write could happen
    209    * prior to the socket being opened.
    210   */
    211   void sendMessage(Serializable message) throws IOException {
    212     checkState(isRunning(), "Cannot read items from a %s StreamService", state());
    213     checkState(socketWriter != null, "Attempted to write to the socket before it was opened.");
    214     try {
    215       socketWriter.write(message);
    216       // We need to flush since this is a back and forth lockstep protocol, buffering can cause
    217       // deadlock!
    218       socketWriter.flush();
    219     } catch (IOException e) {
    220       Closeables.close(socketWriter, true);
    221       notifyFailed(e);
    222       throw e;
    223     }
    224   }
    225 
    226   /** Closes the socket writer. */
    227   void closeWriter() throws IOException {
    228     checkState(isRunning(), "Cannot read items from a %s StreamService", state());
    229     checkState(socketWriter != null, "Attempted to close the socket before it was opened.");
    230     try {
    231       socketWriter.close();
    232     } catch (IOException e) {
    233       notifyFailed(e);
    234       throw e;
    235     }
    236     closeStream();
    237   }
    238 
    239   @Override protected void doStop() {
    240     if (openStreams.get() > 0) {
    241       // This means stop was called on us externally and we are still reading/writing, just log a
    242       // warning and do nothing
    243       logger.warning("Attempting to stop the stream service with streams still open");
    244     }
    245     final ListenableFuture<Integer> processFuture = streamExecutor.submit(new Callable<Integer>() {
    246       @Override public Integer call() throws Exception {
    247         return process.waitFor();
    248       }
    249     });
    250     // Experimentally, even with well behaved processes there is some time between when all streams
    251     // are closed as part of process shutdown and when the process has exited. So to not fail
    252     // flakily when shutting down normally we need to do a timed wait
    253     streamExecutor.submit(new Callable<Void>() {
    254       @Override public Void call() throws Exception {
    255         boolean threw = true;
    256         try {
    257           if (processFuture.get(SHUTDOWN_WAIT_MILLIS, TimeUnit.MILLISECONDS) == 0) {
    258             notifyStopped();
    259           } else {
    260             notifyFailed(
    261                 new Exception("Process failed to stop cleanly. Exit code: " + process.waitFor()));
    262           }
    263           threw = false;
    264         } finally {
    265           processFuture.cancel(true);  // we don't need it anymore
    266           if (threw) {
    267             process.destroy();
    268             notifyFailed(
    269                 new Exception("Process failed to stop cleanly and was forcibly killed. Exit code: "
    270                     + process.waitFor()));
    271           }
    272         }
    273         return null;
    274       }
    275     });
    276   }
    277 
    278   private void closeStream() {
    279     if (openStreams.decrementAndGet() == 0) {
    280       stopAsync();
    281     }
    282   }
    283 
    284   private void closeReadStream() {
    285     if (runningReadStreams.decrementAndGet() == 0) {
    286       outputQueue.add(EOF_ITEM);
    287     }
    288   }
    289 
    290   /** An item read from one of the streams. */
    291   static class StreamItem {
    292     enum Kind {
    293       /** This indicates that it is the last item. */
    294       EOF,
    295       /** This indicates that reading the item timed out. */
    296       TIMEOUT,
    297       /** This indicates that this item has content. */
    298       DATA;
    299     }
    300 
    301     @Nullable private final LogMessage logMessage;
    302     private final Kind kind;
    303 
    304     private StreamItem(LogMessage line) {
    305       this(Kind.DATA, checkNotNull(line));
    306     }
    307 
    308     private StreamItem(Kind state, @Nullable LogMessage logMessage) {
    309       this.logMessage = logMessage;
    310       this.kind = state;
    311     }
    312 
    313     /** Returns the content.  This is only valid if {@link #kind()} return {@link Kind#DATA}. */
    314     LogMessage content() {
    315       checkState(kind == Kind.DATA, "Only data lines have content: %s", this);
    316       return logMessage;
    317     }
    318 
    319     Kind kind() {
    320       return kind;
    321     }
    322 
    323     @Override public String toString() {
    324       ToStringHelper helper = MoreObjects.toStringHelper(StreamItem.class);
    325       if (kind == Kind.DATA) {
    326         helper.addValue(logMessage);
    327       } else {
    328         helper.addValue(kind);
    329       }
    330       return helper.toString();
    331     }
    332   }
    333 
    334   /** Returns a callable that renames the the thread that the given callable runs in. */
    335   private static <T> Callable<T> threadRenaming(final String name, final Callable<T> callable) {
    336     checkNotNull(name);
    337     checkNotNull(callable);
    338     return new Callable<T>() {
    339       @Override public T call() throws Exception {
    340         Thread currentThread = Thread.currentThread();
    341         String oldName = currentThread.getName();
    342         currentThread.setName(name);
    343         try {
    344           return callable.call();
    345         } finally {
    346           currentThread.setName(oldName);
    347         }
    348       }
    349     };
    350   }
    351 
    352   /**
    353    * A background task that reads lines of text from a {@link Reader} and puts them onto a
    354    * {@link BlockingQueue}.
    355    */
    356   private final class StreamReader implements Callable<Void> {
    357     final Reader reader;
    358     final String streamName;
    359 
    360     StreamReader(String streamName, Reader reader) {
    361       this.streamName = streamName;
    362       this.reader = reader;
    363     }
    364 
    365     @Override public Void call() throws IOException, InterruptedException, ParseException {
    366       LineReader lineReader = new LineReader(reader);
    367       boolean threw = true;
    368       try {
    369         String line;
    370         while ((line = lineReader.readLine()) != null) {
    371           trialOutput.log(streamName, line);
    372           LogMessage logMessage = logMessageParser.parse(line);
    373           if (logMessage != null) {
    374             outputQueue.put(new StreamItem(logMessage));
    375           }
    376         }
    377         threw = false;
    378       } catch (Exception e) {
    379         notifyFailed(e);
    380       } finally {
    381         closeReadStream();
    382         Closeables.close(reader, threw);
    383       }
    384       return null;
    385     }
    386   }
    387 
    388   /**
    389    * A background task that reads lines of text from a {@link OpenedSocket.Reader} and puts them
    390    * onto a {@link BlockingQueue}.
    391    */
    392   private final class SocketStreamReader implements Callable<Void> {
    393     final OpenedSocket.Reader reader;
    394 
    395     SocketStreamReader(OpenedSocket.Reader reader) {
    396       this.reader = reader;
    397     }
    398 
    399     @Override public Void call() throws IOException, InterruptedException, ParseException {
    400       boolean threw = true;
    401       try {
    402         Object obj;
    403         while ((obj = reader.read()) != null) {
    404           if (obj instanceof String) {
    405             log(obj.toString());
    406             continue;
    407           }
    408           LogMessage message = (LogMessage) obj;
    409           if (message instanceof StopMeasurementLogMessage) {
    410             // TODO(lukes): how useful are these messages?  They seem like leftover debugging info
    411             for (Measurement measurement : ((StopMeasurementLogMessage) message).measurements()) {
    412               log(String.format("I got a result! %s: %f%s%n",
    413                   measurement.description(),
    414                   measurement.value().magnitude() / measurement.weight(),
    415                   measurement.value().unit()));
    416             }
    417           }
    418           outputQueue.put(new StreamItem(message));
    419         }
    420         threw = false;
    421       } catch (Exception e) {
    422         notifyFailed(e);
    423       } finally {
    424         closeReadStream();
    425         Closeables.close(reader, threw);
    426       }
    427       return null;
    428     }
    429 
    430     private void log(String text) {
    431       trialOutput.log("socket", text);
    432     }
    433   }
    434 }
    435