1 /* 2 * Copyright (C) 2013 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.caliper.runner; 16 17 import static com.google.common.base.Preconditions.checkState; 18 19 import com.google.caliper.bridge.OpenedSocket; 20 import com.google.caliper.bridge.StartupAnnounceMessage; 21 import com.google.common.base.Supplier; 22 import com.google.common.collect.Maps; 23 import com.google.common.collect.Multimaps; 24 import com.google.common.collect.SetMultimap; 25 import com.google.common.collect.Sets; 26 import com.google.common.util.concurrent.AbstractExecutionThreadService; 27 import com.google.common.util.concurrent.ListenableFuture; 28 import com.google.common.util.concurrent.Service; 29 import com.google.common.util.concurrent.SettableFuture; 30 31 import java.io.IOException; 32 import java.net.ServerSocket; 33 import java.net.Socket; 34 import java.net.SocketException; 35 import java.util.Collection; 36 import java.util.Map; 37 import java.util.Set; 38 import java.util.UUID; 39 import java.util.concurrent.locks.Lock; 40 import java.util.concurrent.locks.ReentrantLock; 41 42 import javax.annotation.concurrent.GuardedBy; 43 import javax.inject.Inject; 44 import javax.inject.Singleton; 45 46 /** 47 * A {@link Service} that manages a {@link ServerSocket}. 48 * 49 * <p> This service provides two pieces of functionality: 50 * <ol> 51 * <li>It adapts {@link ServerSocket#accept()} to a {@link ListenableFuture} of an opened socket. 52 * <li>It demultiplexes incoming connections based on a {@link StartupAnnounceMessage} that is 53 * sent over the socket. 54 * </ol> 55 * 56 * <p>The {@linkplain State states} of this service are as follows: 57 * <ul> 58 * <li>{@linkplain State#NEW NEW} : Idle state, the {@link ServerSocket} is not open yet. 59 * <li>{@linkplain State#STARTING STARTING} : {@link ServerSocket} is opened 60 * <li>{@linkplain State#RUNNING RUNNING} : We are continuously accepting and parsing connections 61 * from the socket. 62 * <li>{@linkplain State#STOPPING STOPPING} : The server socket is closing and all pending 63 * connection requests are terminated, connection requests will fail immediately. 64 * <li>{@linkplain State#TERMINATED TERMINATED} : Idle state, the socket is closed. 65 * <li>{@linkplain State#FAILED FAILED} : The service will transition to failed if it encounters 66 * any errors while accepting connections or reading from connections. 67 * </ul> 68 * 69 * <p>Note to future self. There have been a few attempts to make it so that it is no longer 70 * necessary to dedicate a thread to this service (basically turn it into an AbstractIdleService). 71 * The general idea has been to make callers to getConnection invoke accept, here is why it didn't 72 * work. 73 * <ul> 74 * <li>If you make getConnection a blocking method that calls accept until it finds the 75 * connection with its id, then there is no way to deal with connections that never arrive. 76 * For example, if the worker crashes before connecting then the thread calling accept will 77 * block forever waiting for it. The only way to unblock a thread stuck on accept() is to 78 * close the socket (this holds for ServerSocketChannels and normal ServerSockets), but we 79 * cannot do that in this case because the socket is a shared resource. 80 * <li>If you make getConnection a non-blocking, polling based method then you expose yourself 81 * to potential deadlocks (due to missed signals) depending on what thread you poll from. 82 * If the polling thread is any of the threads that are involved with processing messages 83 * from the worker I believe there to be a deadlock risk. Basically, if the worker sends 84 * messages over its output streams and then calls Socket.connect, and no printing to stdout 85 * or stderr occurs while connecting. Then if the runner polls, but misses the connection 86 * and then tries to read again, it will deadlock. 87 * </ul> 88 */ 89 @Singleton 90 final class ServerSocketService extends AbstractExecutionThreadService { 91 private enum Source { REQUEST, ACCEPT} 92 93 private final Lock lock = new ReentrantLock(); 94 95 /** 96 * Contains futures that have either only been accepted or requested. Once both occur they are 97 * removed from this map. 98 */ 99 @GuardedBy("lock") 100 private final Map<UUID, SettableFuture<OpenedSocket>> halfFinishedConnections = Maps.newHashMap(); 101 102 /** 103 * Contains the history of connections so we can ensure that each id is only accepted once and 104 * requested once. 105 */ 106 @GuardedBy("lock") 107 private final SetMultimap<Source, UUID> connectionState = Multimaps.newSetMultimap( 108 Maps.<Source, Collection<UUID>>newEnumMap(Source.class), 109 new Supplier<Set<UUID>>(){ 110 @Override public Set<UUID> get() { 111 return Sets.newHashSet(); 112 } 113 }); 114 115 private ServerSocket serverSocket; 116 117 @Inject ServerSocketService() {} 118 119 int getPort() { 120 awaitRunning(); 121 checkState(serverSocket != null, "Socket has not been opened yet"); 122 return serverSocket.getLocalPort(); 123 } 124 125 /** 126 * Returns a {@link ListenableFuture} for an open connection corresponding to the given id. 127 * 128 * <p>N.B. calling this method 'consumes' the connection and as such calling it twice with the 129 * same id will not work, the second future returned will never complete. Similarly calling it 130 * with an id that does not correspond to a worker trying to connect will also fail. 131 */ 132 public ListenableFuture<OpenedSocket> getConnection(UUID id) { 133 checkState(isRunning(), "You can only get connections from a running service: %s", this); 134 return getConnectionImpl(id, Source.REQUEST); 135 } 136 137 @Override protected void startUp() throws Exception { 138 serverSocket = new ServerSocket(0 /* bind to any available port */); 139 } 140 141 @Override protected void run() throws Exception { 142 while (isRunning()) { 143 Socket socket; 144 try { 145 socket = serverSocket.accept(); 146 } catch (SocketException e) { 147 // we were closed 148 return; 149 } 150 OpenedSocket openedSocket = OpenedSocket.fromSocket(socket); 151 152 UUID id = ((StartupAnnounceMessage) openedSocket.reader().read()).trialId(); 153 // N.B. you should not call set with the lock held, to prevent same thread executors from 154 // running with the lock. 155 getConnectionImpl(id, Source.ACCEPT).set(openedSocket); 156 } 157 } 158 159 /** 160 * Returns a {@link SettableFuture} from the map of connections. 161 * 162 * <p>This method has the following properties: 163 * <ul> 164 * <li>If the id is present in {@link #connectionState}, this will throw an 165 * {@link IllegalStateException}. 166 * <li>The id and source are recorded in {@link #connectionState} 167 * <li>If the future is already in {@link #halfFinishedConnections}, it is removed and 168 * returned. 169 * <li>If the future is not in {@link #halfFinishedConnections}, a new {@link SettableFuture} 170 * is added and then returned. 171 * 172 * <p>These features together ensure that each connection can only be accepted once, only 173 * requested once and once both have happened it will be removed from 174 * {@link #halfFinishedConnections}. 175 */ 176 private SettableFuture<OpenedSocket> getConnectionImpl(UUID id, Source source) { 177 lock.lock(); 178 try { 179 checkState(connectionState.put(source, id), "Connection for %s has already been %s", 180 id, source); 181 SettableFuture<OpenedSocket> future = halfFinishedConnections.get(id); 182 if (future == null) { 183 future = SettableFuture.create(); 184 halfFinishedConnections.put(id, future); 185 } else { 186 halfFinishedConnections.remove(id); 187 } 188 return future; 189 } finally { 190 lock.unlock(); 191 } 192 } 193 194 @Override protected void triggerShutdown() { 195 try { 196 serverSocket.close(); 197 } catch (IOException e) { 198 // best effort... 199 } 200 } 201 202 @Override protected void shutDown() throws Exception { 203 serverSocket.close(); 204 // Now we have either been asked to stop or have failed with some kind of exception, we want to 205 // notify all pending requests, so if there are any references outside of this class they will 206 // notice. 207 lock.lock(); 208 try { 209 for (SettableFuture<OpenedSocket> future : halfFinishedConnections.values()) { 210 future.setException(new Exception("The socket has been closed")); 211 } 212 halfFinishedConnections.clear(); 213 connectionState.clear(); 214 } finally { 215 lock.unlock(); 216 } 217 } 218 } 219