1 import sys, socket, errno, logging 2 from time import time, sleep 3 from autotest_lib.client.common_lib import error, utils 4 5 # default barrier port 6 _DEFAULT_PORT = 11922 7 8 def _get_host_from_id(hostid): 9 # Remove any trailing local identifier following a #. 10 # This allows multiple members per host which is particularly 11 # helpful in testing. 12 if not hostid.startswith('#'): 13 return hostid.split('#')[0] 14 else: 15 raise error.BarrierError( 16 "Invalid Host id: Host Address should be specified") 17 18 19 class BarrierAbortError(error.BarrierError): 20 """Special BarrierError raised when an explicit abort is requested.""" 21 22 23 class listen_server(object): 24 """ 25 Manages a listening socket for barrier. 26 27 Can be used to run multiple barrier instances with the same listening 28 socket (if they were going to listen on the same port). 29 30 Attributes: 31 32 @attr address: Address to bind to (string). 33 @attr port: Port to bind to. 34 @attr socket: Listening socket object. 35 """ 36 def __init__(self, address='', port=_DEFAULT_PORT): 37 """ 38 Create a listen_server instance for the given address/port. 39 40 @param address: The address to listen on. 41 @param port: The port to listen on. 42 """ 43 self.address = address 44 self.port = port 45 # Open the port so that the listening server can accept incoming 46 # connections. 47 utils.run('iptables -A INPUT -p tcp -m tcp --dport %d -j ACCEPT' % 48 port) 49 self.socket = self._setup() 50 51 52 def _setup(self): 53 """Create, bind and listen on the listening socket.""" 54 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 55 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 56 sock.bind((self.address, self.port)) 57 sock.listen(10) 58 59 return sock 60 61 62 def close(self): 63 """Close the listening socket.""" 64 self.socket.close() 65 66 67 class barrier(object): 68 """Multi-machine barrier support. 69 70 Provides multi-machine barrier mechanism. 71 Execution stops until all members arrive at the barrier. 72 73 Implementation Details: 74 ....................... 75 76 When a barrier is forming the master node (first in sort order) in the 77 set accepts connections from each member of the set. As they arrive 78 they indicate the barrier they are joining and their identifier (their 79 hostname or IP address and optional tag). They are then asked to wait. 80 When all members are present the master node then checks that each 81 member is still responding via a ping/pong exchange. If this is 82 successful then everyone has checked in at the barrier. We then tell 83 everyone they may continue via a rlse message. 84 85 Where the master is not the first to reach the barrier the client 86 connects will fail. Client will retry until they either succeed in 87 connecting to master or the overall timeout is exceeded. 88 89 As an example here is the exchange for a three node barrier called 90 'TAG' 91 92 MASTER CLIENT1 CLIENT2 93 <-------------TAG C1------------- 94 --------------wait--------------> 95 [...] 96 <-------------TAG C2----------------------------- 97 --------------wait------------------------------> 98 [...] 99 --------------ping--------------> 100 <-------------pong--------------- 101 --------------ping------------------------------> 102 <-------------pong------------------------------- 103 ----- BARRIER conditions MET ----- 104 --------------rlse--------------> 105 --------------rlse------------------------------> 106 107 Note that once the last client has responded to pong the barrier is 108 implicitly deemed satisifed, they have all acknowledged their presence. 109 If we fail to send any of the rlse messages the barrier is still a 110 success, the failed host has effectively broken 'right at the beginning' 111 of the post barrier execution window. 112 113 In addition, there is another rendezvous, that makes each slave a server 114 and the master a client. The connection process and usage is still the 115 same but allows barriers from machines that only have a one-way 116 connection initiation. This is called rendezvous_servers. 117 118 For example: 119 if ME == SERVER: 120 server start 121 122 b = job.barrier(ME, 'server-up', 120) 123 b.rendezvous(CLIENT, SERVER) 124 125 if ME == CLIENT: 126 client run 127 128 b = job.barrier(ME, 'test-complete', 3600) 129 b.rendezvous(CLIENT, SERVER) 130 131 if ME == SERVER: 132 server stop 133 134 Any client can also request an abort of the job by setting 135 abort=True in the rendezvous arguments. 136 """ 137 138 def __init__(self, hostid, tag, timeout=None, port=None, 139 listen_server=None): 140 """ 141 @param hostid: My hostname/IP address + optional tag. 142 @param tag: Symbolic name of the barrier in progress. 143 @param timeout: Maximum seconds to wait for a the barrier to meet. 144 @param port: Port number to listen on. 145 @param listen_server: External listen_server instance to use instead 146 of creating our own. Create a listen_server instance and 147 reuse it across multiple barrier instances so that the 148 barrier code doesn't try to quickly re-bind on the same port 149 (packets still in transit for the previous barrier they may 150 reset new connections). 151 """ 152 self._hostid = hostid 153 self._tag = tag 154 if listen_server: 155 if port: 156 raise error.BarrierError( 157 '"port" and "listen_server" are mutually exclusive.') 158 self._port = listen_server.port 159 else: 160 self._port = port or _DEFAULT_PORT 161 self._server = listen_server # A listen_server instance or None. 162 self._members = [] # List of hosts we expect to find at the barrier. 163 self._timeout_secs = timeout 164 self._start_time = None # Timestamp of when we started waiting. 165 self._masterid = None # Host/IP + optional tag of selected master. 166 logging.info("tag=%s port=%d timeout=%r", 167 self._tag, self._port, self._timeout_secs) 168 169 # Number of clients seen (should be the length of self._waiting). 170 self._seen = 0 171 172 # Clients who have checked in and are waiting (if we are a master). 173 self._waiting = {} # Maps from hostname -> (client, addr) tuples. 174 175 176 def _update_timeout(self, timeout): 177 if timeout is not None and self._start_time is not None: 178 self._timeout_secs = (time() - self._start_time) + timeout 179 else: 180 self._timeout_secs = timeout 181 182 183 def _remaining(self): 184 if self._timeout_secs is not None and self._start_time is not None: 185 timeout = self._timeout_secs - (time() - self._start_time) 186 if timeout <= 0: 187 errmsg = "timeout waiting for barrier: %s" % self._tag 188 logging.error(error) 189 raise error.BarrierError(errmsg) 190 else: 191 timeout = self._timeout_secs 192 193 if self._timeout_secs is not None: 194 logging.info("seconds remaining: %d", timeout) 195 return timeout 196 197 198 def _master_welcome(self, connection): 199 client, addr = connection 200 name = None 201 202 client.settimeout(5) 203 try: 204 # Get the clients name. 205 intro = client.recv(1024) 206 intro = intro.strip("\r\n") 207 208 intro_parts = intro.split(' ', 2) 209 if len(intro_parts) != 2: 210 logging.warning("Ignoring invalid data from %s: %r", 211 client.getpeername(), intro) 212 client.close() 213 return 214 tag, name = intro_parts 215 216 logging.info("new client tag=%s, name=%s", tag, name) 217 218 # Ok, we know who is trying to attach. Confirm that 219 # they are coming to the same meeting. Also, everyone 220 # should be using a unique handle (their IP address). 221 # If we see a duplicate, something _bad_ has happened 222 # so drop them now. 223 if self._tag != tag: 224 logging.warning("client arriving for the wrong barrier: %s != %s", 225 self._tag, tag) 226 client.settimeout(5) 227 client.send("!tag") 228 client.close() 229 return 230 elif name in self._waiting: 231 logging.warning("duplicate client") 232 client.settimeout(5) 233 client.send("!dup") 234 client.close() 235 return 236 237 # Acknowledge the client 238 client.send("wait") 239 240 except socket.timeout: 241 # This is nominally an error, but as we do not know 242 # who that was we cannot do anything sane other 243 # than report it and let the normal timeout kill 244 # us when thats appropriate. 245 logging.warning("client handshake timeout: (%s:%d)", 246 addr[0], addr[1]) 247 client.close() 248 return 249 250 logging.info("client now waiting: %s (%s:%d)", 251 name, addr[0], addr[1]) 252 253 # They seem to be valid record them. 254 self._waiting[name] = connection 255 self._seen += 1 256 257 258 def _slave_hello(self, connection): 259 (client, addr) = connection 260 name = None 261 262 client.settimeout(5) 263 try: 264 client.send(self._tag + " " + self._hostid) 265 266 reply = client.recv(4) 267 reply = reply.strip("\r\n") 268 logging.info("master said: %s", reply) 269 270 # Confirm the master accepted the connection. 271 if reply != "wait": 272 logging.warning("Bad connection request to master") 273 client.close() 274 return 275 276 except socket.timeout: 277 # This is nominally an error, but as we do not know 278 # who that was we cannot do anything sane other 279 # than report it and let the normal timeout kill 280 # us when thats appropriate. 281 logging.error("master handshake timeout: (%s:%d)", 282 addr[0], addr[1]) 283 client.close() 284 return 285 286 logging.info("slave now waiting: (%s:%d)", addr[0], addr[1]) 287 288 # They seem to be valid record them. 289 self._waiting[self._hostid] = connection 290 self._seen = 1 291 292 293 def _master_release(self): 294 # Check everyone is still there, that they have not 295 # crashed or disconnected in the meantime. 296 allpresent = True 297 abort = self._abort 298 for name in self._waiting: 299 (client, addr) = self._waiting[name] 300 301 logging.info("checking client present: %s", name) 302 303 client.settimeout(5) 304 reply = 'none' 305 try: 306 client.send("ping") 307 reply = client.recv(1024) 308 except socket.timeout: 309 logging.warning("ping/pong timeout: %s", name) 310 pass 311 312 if reply == 'abrt': 313 logging.warning("Client %s requested abort", name) 314 abort = True 315 elif reply != "pong": 316 allpresent = False 317 318 if not allpresent: 319 raise error.BarrierError("master lost client") 320 321 if abort: 322 logging.info("Aborting the clients") 323 msg = 'abrt' 324 else: 325 logging.info("Releasing clients") 326 msg = 'rlse' 327 328 # If every ones checks in then commit the release. 329 for name in self._waiting: 330 (client, addr) = self._waiting[name] 331 332 client.settimeout(5) 333 try: 334 client.send(msg) 335 except socket.timeout: 336 logging.warning("release timeout: %s", name) 337 pass 338 339 if abort: 340 raise BarrierAbortError("Client requested abort") 341 342 343 def _waiting_close(self): 344 # Either way, close out all the clients. If we have 345 # not released them then they know to abort. 346 for name in self._waiting: 347 (client, addr) = self._waiting[name] 348 349 logging.info("closing client: %s", name) 350 351 try: 352 client.close() 353 except: 354 pass 355 356 357 def _run_server(self, is_master): 358 server = self._server or listen_server(port=self._port) 359 failed = 0 360 try: 361 while True: 362 try: 363 # Wait for callers welcoming each. 364 server.socket.settimeout(self._remaining()) 365 connection = server.socket.accept() 366 if is_master: 367 self._master_welcome(connection) 368 else: 369 self._slave_hello(connection) 370 except socket.timeout: 371 logging.warning("timeout waiting for remaining clients") 372 pass 373 374 if is_master: 375 # Check if everyone is here. 376 logging.info("master seen %d of %d", 377 self._seen, len(self._members)) 378 if self._seen == len(self._members): 379 self._master_release() 380 break 381 else: 382 # Check if master connected. 383 if self._seen: 384 logging.info("slave connected to master") 385 self._slave_wait() 386 break 387 finally: 388 self._waiting_close() 389 # if we created the listening_server in the beginning of this 390 # function then close the listening socket here 391 if not self._server: 392 server.close() 393 394 395 def _run_client(self, is_master): 396 while self._remaining() is None or self._remaining() > 0: 397 try: 398 remote = socket.socket(socket.AF_INET, 399 socket.SOCK_STREAM) 400 remote.settimeout(30) 401 if is_master: 402 # Connect to all slaves. 403 host = _get_host_from_id(self._members[self._seen]) 404 logging.info("calling slave: %s", host) 405 connection = (remote, (host, self._port)) 406 remote.connect(connection[1]) 407 self._master_welcome(connection) 408 else: 409 # Just connect to the master. 410 host = _get_host_from_id(self._masterid) 411 logging.info("calling master") 412 connection = (remote, (host, self._port)) 413 remote.connect(connection[1]) 414 self._slave_hello(connection) 415 except socket.timeout: 416 logging.warning("timeout calling host, retry") 417 sleep(10) 418 pass 419 except socket.error, err: 420 (code, str) = err 421 if (code != errno.ECONNREFUSED): 422 raise 423 sleep(10) 424 425 if is_master: 426 # Check if everyone is here. 427 logging.info("master seen %d of %d", 428 self._seen, len(self._members)) 429 if self._seen == len(self._members): 430 self._master_release() 431 break 432 else: 433 # Check if master connected. 434 if self._seen: 435 logging.info("slave connected to master") 436 self._slave_wait() 437 break 438 439 self._waiting_close() 440 441 442 def _slave_wait(self): 443 remote = self._waiting[self._hostid][0] 444 mode = "wait" 445 while True: 446 # All control messages are the same size to allow 447 # us to split individual messages easily. 448 remote.settimeout(self._remaining()) 449 reply = remote.recv(4) 450 if not reply: 451 break 452 453 reply = reply.strip("\r\n") 454 logging.info("master said: %s", reply) 455 456 mode = reply 457 if reply == "ping": 458 # Ensure we have sufficient time for the 459 # ping/pong/rlse cyle to complete normally. 460 self._update_timeout(10 + 10 * len(self._members)) 461 462 if self._abort: 463 msg = "abrt" 464 else: 465 msg = "pong" 466 logging.info(msg) 467 remote.settimeout(self._remaining()) 468 remote.send(msg) 469 470 elif reply == "rlse" or reply == "abrt": 471 # Ensure we have sufficient time for the 472 # ping/pong/rlse cyle to complete normally. 473 self._update_timeout(10 + 10 * len(self._members)) 474 475 logging.info("was released, waiting for close") 476 477 if mode == "rlse": 478 pass 479 elif mode == "wait": 480 raise error.BarrierError("master abort -- barrier timeout") 481 elif mode == "ping": 482 raise error.BarrierError("master abort -- client lost") 483 elif mode == "!tag": 484 raise error.BarrierError("master abort -- incorrect tag") 485 elif mode == "!dup": 486 raise error.BarrierError("master abort -- duplicate client") 487 elif mode == "abrt": 488 raise BarrierAbortError("Client requested abort") 489 else: 490 raise error.BarrierError("master handshake failure: " + mode) 491 492 493 def rendezvous(self, *hosts, **dargs): 494 # if called with abort=True, this will raise an exception 495 # on all the clients. 496 self._start_time = time() 497 self._members = list(hosts) 498 self._members.sort() 499 self._masterid = self._members.pop(0) 500 self._abort = dargs.get('abort', False) 501 502 logging.info("masterid: %s", self._masterid) 503 if self._abort: 504 logging.debug("%s is aborting", self._hostid) 505 if not len(self._members): 506 logging.info("No other members listed.") 507 return 508 logging.info("members: %s", ",".join(self._members)) 509 510 self._seen = 0 511 self._waiting = {} 512 513 # Figure out who is the master in this barrier. 514 if self._hostid == self._masterid: 515 logging.info("selected as master") 516 self._run_server(is_master=True) 517 else: 518 logging.info("selected as slave") 519 self._run_client(is_master=False) 520 521 522 def rendezvous_servers(self, masterid, *hosts, **dargs): 523 # if called with abort=True, this will raise an exception 524 # on all the clients. 525 self._start_time = time() 526 self._members = list(hosts) 527 self._members.sort() 528 self._masterid = masterid 529 self._abort = dargs.get('abort', False) 530 531 logging.info("masterid: %s", self._masterid) 532 if not len(self._members): 533 logging.info("No other members listed.") 534 return 535 logging.info("members: %s", ",".join(self._members)) 536 537 self._seen = 0 538 self._waiting = {} 539 540 # Figure out who is the master in this barrier. 541 if self._hostid == self._masterid: 542 logging.info("selected as master") 543 self._run_client(is_master=True) 544 else: 545 logging.info("selected as slave") 546 self._run_server(is_master=False) 547