Home | History | Annotate | Download | only in runner
      1 /*
      2  * Copyright (C) 2013 Google Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
      5  * in compliance with the License. You may obtain a copy of the License at
      6  *
      7  * http://www.apache.org/licenses/LICENSE-2.0
      8  *
      9  * Unless required by applicable law or agreed to in writing, software distributed under the License
     10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
     11  * or implied. See the License for the specific language governing permissions and limitations under
     12  * the License.
     13  */
     14 
     15 package com.google.caliper.runner;
     16 
     17 import static com.google.common.base.Preconditions.checkState;
     18 
     19 import com.google.caliper.bridge.OpenedSocket;
     20 import com.google.caliper.bridge.StartupAnnounceMessage;
     21 import com.google.common.base.Supplier;
     22 import com.google.common.collect.Maps;
     23 import com.google.common.collect.Multimaps;
     24 import com.google.common.collect.SetMultimap;
     25 import com.google.common.collect.Sets;
     26 import com.google.common.util.concurrent.AbstractExecutionThreadService;
     27 import com.google.common.util.concurrent.ListenableFuture;
     28 import com.google.common.util.concurrent.Service;
     29 import com.google.common.util.concurrent.SettableFuture;
     30 
     31 import java.io.IOException;
     32 import java.net.ServerSocket;
     33 import java.net.Socket;
     34 import java.net.SocketException;
     35 import java.util.Collection;
     36 import java.util.Map;
     37 import java.util.Set;
     38 import java.util.UUID;
     39 import java.util.concurrent.locks.Lock;
     40 import java.util.concurrent.locks.ReentrantLock;
     41 
     42 import javax.annotation.concurrent.GuardedBy;
     43 import javax.inject.Inject;
     44 import javax.inject.Singleton;
     45 
     46 /**
     47  * A {@link Service} that manages a {@link ServerSocket}.
     48  *
     49  * <p> This service provides two pieces of functionality:
     50  * <ol>
     51  *   <li>It adapts {@link ServerSocket#accept()} to a {@link ListenableFuture} of an opened socket.
     52  *   <li>It demultiplexes incoming connections based on a {@link StartupAnnounceMessage} that is
     53  *       sent over the socket.
     54  * </ol>
     55  *
     56  * <p>The {@linkplain State states} of this service are as follows:
     57  * <ul>
     58  *   <li>{@linkplain State#NEW NEW} : Idle state, the {@link ServerSocket} is not open yet.
     59  *   <li>{@linkplain State#STARTING STARTING} : {@link ServerSocket} is opened
     60  *   <li>{@linkplain State#RUNNING RUNNING} : We are continuously accepting and parsing connections
     61  *       from the socket.
     62  *   <li>{@linkplain State#STOPPING STOPPING} : The server socket is closing and all pending
     63  *       connection requests are terminated, connection requests will fail immediately.
     64  *   <li>{@linkplain State#TERMINATED TERMINATED} : Idle state, the socket is closed.
     65  *   <li>{@linkplain State#FAILED FAILED} : The service will transition to failed if it encounters
     66  *       any errors while accepting connections or reading from connections.
     67  * </ul>
     68  *
     69  * <p>Note to future self.  There have been a few attempts to make it so that it is no longer
     70  * necessary to dedicate a thread to this service (basically turn it into an AbstractIdleService).
     71  * The general idea has been to make callers to getConnection invoke accept, here is why it didn't
     72  * work.
     73  * <ul>
     74  *     <li>If you make getConnection a blocking method that calls accept until it finds the
     75  *         connection with its id, then there is no way to deal with connections that never arrive.
     76  *         For example, if the worker crashes before connecting then the thread calling accept will
     77  *         block forever waiting for it.  The only way to unblock a thread stuck on accept() is to
     78  *         close the socket (this holds for ServerSocketChannels and normal ServerSockets), but we
     79  *         cannot do that in this case because the socket is a shared resource.
     80  *     <li>If you make getConnection a non-blocking, polling based method then you expose yourself
     81  *         to potential deadlocks (due to missed signals) depending on what thread you poll from.
     82  *         If the polling thread is any of the threads that are involved with processing messages
     83  *         from the worker I believe there to be a deadlock risk.  Basically, if the worker sends
     84  *         messages over its output streams and then calls Socket.connect, and no printing to stdout
     85  *         or stderr occurs while connecting. Then if the runner polls, but misses the connection
     86  *         and then tries to read again, it will deadlock.
     87  * </ul>
     88  */
     89 @Singleton
     90 final class ServerSocketService extends AbstractExecutionThreadService {
     91   private enum Source { REQUEST, ACCEPT}
     92 
     93   private final Lock lock = new ReentrantLock();
     94 
     95   /**
     96    * Contains futures that have either only been accepted or requested.  Once both occur they are
     97    * removed from this map.
     98    */
     99   @GuardedBy("lock")
    100   private final Map<UUID, SettableFuture<OpenedSocket>> halfFinishedConnections = Maps.newHashMap();
    101 
    102   /**
    103    * Contains the history of connections so we can ensure that each id is only accepted once and
    104    * requested once.
    105    */
    106   @GuardedBy("lock")
    107   private final SetMultimap<Source, UUID> connectionState = Multimaps.newSetMultimap(
    108       Maps.<Source, Collection<UUID>>newEnumMap(Source.class),
    109       new Supplier<Set<UUID>>(){
    110         @Override public Set<UUID> get() {
    111           return Sets.newHashSet();
    112         }
    113       });
    114 
    115   private ServerSocket serverSocket;
    116 
    117   @Inject ServerSocketService() {}
    118 
    119   int getPort() {
    120     awaitRunning();
    121     checkState(serverSocket != null, "Socket has not been opened yet");
    122     return serverSocket.getLocalPort();
    123   }
    124 
    125   /**
    126    * Returns a {@link ListenableFuture} for an open connection corresponding to the given id.
    127    *
    128    * <p>N.B. calling this method 'consumes' the connection and as such calling it twice with the
    129    * same id will not work, the second future returned will never complete.  Similarly calling it
    130    * with an id that does not correspond to a worker trying to connect will also fail.
    131    */
    132   public ListenableFuture<OpenedSocket> getConnection(UUID id) {
    133     checkState(isRunning(), "You can only get connections from a running service: %s", this);
    134     return getConnectionImpl(id, Source.REQUEST);
    135   }
    136 
    137   @Override protected void startUp() throws Exception {
    138     serverSocket = new ServerSocket(0 /* bind to any available port */);
    139   }
    140 
    141   @Override protected void run() throws Exception {
    142     while (isRunning()) {
    143       Socket socket;
    144       try {
    145         socket = serverSocket.accept();
    146       } catch (SocketException e) {
    147         // we were closed
    148         return;
    149       }
    150       OpenedSocket openedSocket = OpenedSocket.fromSocket(socket);
    151 
    152       UUID id = ((StartupAnnounceMessage) openedSocket.reader().read()).trialId();
    153       // N.B. you should not call set with the lock held, to prevent same thread executors from
    154       // running with the lock.
    155       getConnectionImpl(id, Source.ACCEPT).set(openedSocket);
    156     }
    157   }
    158 
    159   /**
    160    * Returns a {@link SettableFuture} from the map of connections.
    161    *
    162    * <p>This method has the following properties:
    163    * <ul>
    164    *    <li>If the id is present in {@link #connectionState}, this will throw an
    165    *        {@link IllegalStateException}.
    166    *    <li>The id and source are recorded in {@link #connectionState}
    167    *    <li>If the future is already in {@link #halfFinishedConnections}, it is removed and
    168    *        returned.
    169    *    <li>If the future is not in {@link #halfFinishedConnections}, a new {@link SettableFuture}
    170    *        is added and then returned.
    171    *
    172    * <p>These features together ensure that each connection can only be accepted once, only
    173    * requested once and once both have happened it will be removed from
    174    * {@link #halfFinishedConnections}.
    175    */
    176   private SettableFuture<OpenedSocket> getConnectionImpl(UUID id, Source source) {
    177     lock.lock();
    178     try {
    179       checkState(connectionState.put(source, id), "Connection for %s has already been %s",
    180           id, source);
    181       SettableFuture<OpenedSocket> future = halfFinishedConnections.get(id);
    182       if (future == null) {
    183         future = SettableFuture.create();
    184         halfFinishedConnections.put(id, future);
    185       } else {
    186         halfFinishedConnections.remove(id);
    187       }
    188       return future;
    189     } finally {
    190       lock.unlock();
    191     }
    192   }
    193 
    194   @Override protected void triggerShutdown() {
    195     try {
    196       serverSocket.close();
    197     } catch (IOException e) {
    198       // best effort...
    199     }
    200   }
    201 
    202   @Override protected void shutDown() throws Exception {
    203     serverSocket.close();
    204     // Now we have either been asked to stop or have failed with some kind of exception, we want to
    205     // notify all pending requests, so if there are any references outside of this class they will
    206     // notice.
    207     lock.lock();
    208     try {
    209       for (SettableFuture<OpenedSocket> future : halfFinishedConnections.values()) {
    210         future.setException(new Exception("The socket has been closed"));
    211       }
    212       halfFinishedConnections.clear();
    213       connectionState.clear();
    214     } finally {
    215       lock.unlock();
    216     }
    217   }
    218 }
    219