Home | History | Annotate | Download | only in runner
      1 /*
      2  * Copyright (C) 2013 Google Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
      5  * in compliance with the License. You may obtain a copy of the License at
      6  *
      7  * http://www.apache.org/licenses/LICENSE-2.0
      8  *
      9  * Unless required by applicable law or agreed to in writing, software distributed under the License
     10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
     11  * or implied. See the License for the specific language governing permissions and limitations under
     12  * the License.
     13  */
     14 
     15 package com.google.caliper.runner;
     16 
     17 import static com.google.common.base.Preconditions.checkArgument;
     18 import static com.google.common.base.Preconditions.checkState;
     19 import static org.junit.Assert.assertEquals;
     20 import static org.junit.Assert.assertNotSame;
     21 import static org.junit.Assert.assertTrue;
     22 import static org.junit.Assert.fail;
     23 
     24 import com.google.caliper.bridge.LogMessage;
     25 import com.google.caliper.bridge.OpenedSocket;
     26 import com.google.caliper.runner.FakeWorkers.DummyLogMessage;
     27 import com.google.caliper.runner.StreamService.StreamItem;
     28 import com.google.caliper.runner.StreamService.StreamItem.Kind;
     29 import com.google.caliper.util.Parser;
     30 import com.google.common.collect.Sets;
     31 import com.google.common.util.concurrent.ListenableFuture;
     32 import com.google.common.util.concurrent.ListenableFutureTask;
     33 import com.google.common.util.concurrent.MoreExecutors;
     34 import com.google.common.util.concurrent.Service.Listener;
     35 import com.google.common.util.concurrent.Service.State;
     36 
     37 import org.junit.After;
     38 import org.junit.Before;
     39 import org.junit.Test;
     40 import org.junit.runner.RunWith;
     41 import org.junit.runners.JUnit4;
     42 
     43 import java.io.File;
     44 import java.io.FileNotFoundException;
     45 import java.io.IOException;
     46 import java.io.PrintWriter;
     47 import java.io.StringWriter;
     48 import java.net.ServerSocket;
     49 import java.net.SocketException;
     50 import java.text.ParseException;
     51 import java.util.Set;
     52 import java.util.UUID;
     53 import java.util.concurrent.Callable;
     54 import java.util.concurrent.CountDownLatch;
     55 import java.util.concurrent.TimeUnit;
     56 
     57 /**
     58  * Tests for {@link StreamService}.
     59  */
     60 @RunWith(JUnit4.class)
     61 
     62 public class StreamServiceTest {
     63 
     64   private ServerSocket serverSocket;
     65   private final StringWriter writer = new StringWriter();
     66   private final PrintWriter stdout = new PrintWriter(writer, true);
     67   private final Parser<LogMessage> parser = new Parser<LogMessage>() {
     68     @Override public LogMessage parse(final CharSequence text) throws ParseException {
     69       return new DummyLogMessage(text.toString());
     70     }
     71   };
     72 
     73   private StreamService service;
     74   private final CountDownLatch terminalLatch = new CountDownLatch(1);
     75   private static final int TRIAL_NUMBER = 3;
     76 
     77   @Before public void setUp() throws IOException {
     78     serverSocket = new ServerSocket(0);
     79   }
     80 
     81   @After public void closeSocket() throws IOException {
     82     serverSocket.close();
     83   }
     84 
     85   @After public void stopService() {
     86     if (service != null && service.state() != State.FAILED && service.state() != State.TERMINATED) {
     87       service.stopAsync().awaitTerminated();
     88     }
     89   }
     90 
     91   @Test public void testReadOutput() throws Exception {
     92     makeService(FakeWorkers.PrintClient.class, "foo", "bar");
     93     service.startAsync().awaitRunning();
     94     StreamItem item1 = readItem();
     95     assertEquals(Kind.DATA, item1.kind());
     96     Set<String> lines = Sets.newHashSet();
     97     lines.add(item1.content().toString());
     98     StreamItem item2 = readItem();
     99     assertEquals(Kind.DATA, item2.kind());
    100     lines.add(item2.content().toString());
    101     assertEquals(Sets.newHashSet("foo", "bar"), lines);
    102     assertEquals(State.RUNNING, service.state());
    103     StreamItem item3 = readItem();
    104     assertEquals(Kind.EOF, item3.kind());
    105     awaitStopped(100, TimeUnit.MILLISECONDS);
    106     assertTerminated();
    107   }
    108 
    109   @Test public void failingProcess() throws Exception {
    110     makeService(FakeWorkers.Exit.class, "1");
    111     service.startAsync().awaitRunning();
    112     assertEquals(Kind.EOF, readItem().kind());
    113     awaitStopped(100, TimeUnit.MILLISECONDS);
    114     assertEquals(State.FAILED, service.state());
    115   }
    116 
    117   @Test public void processDoesntExit() throws Exception {
    118     // close all fds and then sleep
    119     makeService(FakeWorkers.CloseAndSleep.class);
    120     service.startAsync().awaitRunning();
    121     assertEquals(Kind.EOF, readItem().kind());
    122     awaitStopped(200, TimeUnit.MILLISECONDS);  // we
    123     assertEquals(State.FAILED, service.state());
    124   }
    125 
    126   @Test public void testSocketInputOutput() throws Exception {
    127     int localport = serverSocket.getLocalPort();
    128     // read from the socket and echo it back
    129     makeService(FakeWorkers.SocketEchoClient.class, Integer.toString(localport));
    130 
    131     service.startAsync().awaitRunning();
    132     assertEquals(new DummyLogMessage("start"), readItem().content());
    133     service.sendMessage(new DummyLogMessage("hello socket world"));
    134     assertEquals(new DummyLogMessage("hello socket world"), readItem().content());
    135     service.closeWriter();
    136     assertEquals(State.RUNNING, service.state());
    137     StreamItem nextItem = readItem();
    138     assertEquals("Expected EOF " + nextItem, Kind.EOF, nextItem.kind());
    139     awaitStopped(100, TimeUnit.MILLISECONDS);
    140     assertTerminated();
    141   }
    142 
    143   @Test public void testSocketClosesBeforeProcess() throws Exception {
    144     int localport = serverSocket.getLocalPort();
    145     // read from the socket and echo it back
    146     makeService(FakeWorkers.SocketEchoClient.class, Integer.toString(localport), "foo");
    147     service.startAsync().awaitRunning();
    148     assertEquals(new DummyLogMessage("start"), readItem().content());
    149     service.sendMessage(new DummyLogMessage("hello socket world"));
    150     assertEquals(new DummyLogMessage("hello socket world"), readItem().content());
    151     service.closeWriter();
    152 
    153     assertEquals("foo", readItem().content().toString());
    154 
    155     assertEquals(State.RUNNING, service.state());
    156     assertEquals(Kind.EOF, readItem().kind());
    157     awaitStopped(100, TimeUnit.MILLISECONDS);
    158     assertTerminated();
    159   }
    160 
    161   @Test public void failsToAcceptConnection() throws Exception {
    162     serverSocket.close();  // This will force serverSocket.accept to throw a SocketException
    163     makeService(FakeWorkers.Sleeper.class, Long.toString(TimeUnit.MINUTES.toMillis(10)));
    164     try {
    165       service.startAsync().awaitRunning();
    166       fail();
    167     } catch (IllegalStateException expected) {}
    168     assertEquals(SocketException.class, service.failureCause().getClass());
    169   }
    170 
    171   /** Reads an item, asserting that there was no timeout. */
    172   private StreamItem readItem() throws InterruptedException {
    173     StreamItem item = service.readItem(10, TimeUnit.SECONDS);
    174     assertNotSame("Timed out while reading item from worker", Kind.TIMEOUT, item.kind());
    175     return item;
    176   }
    177 
    178   /**
    179    * Wait for the service to reach a terminal state without calling stop.
    180    */
    181   private void awaitStopped(long time, TimeUnit unit) throws InterruptedException {
    182     assertTrue(terminalLatch.await(time, unit));
    183   }
    184 
    185   private void assertTerminated() {
    186     State state = service.state();
    187     if (state != State.TERMINATED) {
    188       if (state == State.FAILED) {
    189         throw new AssertionError(service.failureCause());
    190       }
    191       fail("Expected service to be terminated but was: " + state);
    192     }
    193   }
    194 
    195   @SuppressWarnings("resource")
    196   private void makeService(Class<?> main, String ...args) {
    197     checkState(service == null, "You can only make one StreamService per test");
    198     UUID trialId = UUID.randomUUID();
    199     TrialOutputLogger trialOutput = new TrialOutputLogger(new TrialOutputFactory() {
    200       @Override public FileAndWriter getTrialOutputFile(int trialNumber)
    201           throws FileNotFoundException {
    202         checkArgument(trialNumber == TRIAL_NUMBER);
    203         return new FileAndWriter(new File("/tmp/not-a-file"), stdout);
    204       }
    205 
    206       @Override public void persistFile(File f) {
    207         throw new UnsupportedOperationException();
    208       }
    209 
    210     }, TRIAL_NUMBER, trialId, null /* experiment */);
    211     try {
    212       // normally the TrialRunLoop opens/closes the logger
    213       trialOutput.open();
    214     } catch (IOException e) {
    215       throw new RuntimeException(e);
    216     }
    217     service = new StreamService(
    218         new WorkerProcess(FakeWorkers.createProcessBuilder(main, args),
    219             trialId,
    220             getSocketFuture(),
    221             new RuntimeShutdownHookRegistrar()),
    222         parser,
    223         trialOutput);
    224     service.addListener(new Listener() {
    225       @Override public void starting() {}
    226       @Override public void running() {}
    227       @Override public void stopping(State from) {}
    228       @Override public void terminated(State from) {
    229         terminalLatch.countDown();
    230       }
    231       @Override public void failed(State from, Throwable failure) {
    232         terminalLatch.countDown();
    233       }
    234     }, MoreExecutors.directExecutor());
    235   }
    236 
    237   private ListenableFuture<OpenedSocket> getSocketFuture() {
    238     ListenableFutureTask<OpenedSocket> openSocketTask = ListenableFutureTask.create(
    239         new Callable<OpenedSocket>() {
    240           @Override
    241           public OpenedSocket call() throws Exception {
    242             return OpenedSocket.fromSocket(serverSocket.accept());
    243           }
    244         });
    245     // N.B. this thread will block on serverSocket.accept until a connection is accepted or the
    246     // socket is closed, so no matter what this thread will die with the test.
    247     Thread opener = new Thread(openSocketTask, "SocketOpener");
    248     opener.setDaemon(true);
    249     opener.start();
    250     return openSocketTask;
    251   }
    252 }
    253