Home | History | Annotate | Download | only in common_lib
      1 import sys, socket, errno, logging
      2 from time import time, sleep
      3 from autotest_lib.client.common_lib import error, utils
      4 
      5 # default barrier port
      6 _DEFAULT_PORT = 11922
      7 
      8 def get_host_from_id(hostid):
      9     # Remove any trailing local identifier following a #.
     10     # This allows multiple members per host which is particularly
     11     # helpful in testing.
     12     if not hostid.startswith('#'):
     13         return hostid.split('#')[0]
     14     else:
     15         raise error.BarrierError(
     16             "Invalid Host id: Host Address should be specified")
     17 
     18 
     19 class BarrierAbortError(error.BarrierError):
     20     """Special BarrierError raised when an explicit abort is requested."""
     21 
     22 
     23 class listen_server(object):
     24     """
     25     Manages a listening socket for barrier.
     26 
     27     Can be used to run multiple barrier instances with the same listening
     28     socket (if they were going to listen on the same port).
     29 
     30     Attributes:
     31 
     32     @attr address: Address to bind to (string).
     33     @attr port: Port to bind to.
     34     @attr socket: Listening socket object.
     35     """
     36     def __init__(self, address='', port=_DEFAULT_PORT):
     37         """
     38         Create a listen_server instance for the given address/port.
     39 
     40         @param address: The address to listen on.
     41         @param port: The port to listen on.
     42         """
     43         self.address = address
     44         self.port = port
     45         # Open the port so that the listening server can accept incoming
     46         # connections.
     47         utils.run('iptables -A INPUT -p tcp -m tcp --dport %d -j ACCEPT' %
     48                   port)
     49         self.socket = self._setup()
     50 
     51 
     52     def _setup(self):
     53         """Create, bind and listen on the listening socket."""
     54         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     55         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
     56         sock.bind((self.address, self.port))
     57         sock.listen(10)
     58 
     59         return sock
     60 
     61 
     62     def close(self):
     63         """Close the listening socket."""
     64         self.socket.close()
     65 
     66 
     67 class barrier(object):
     68     """Multi-machine barrier support.
     69 
     70     Provides multi-machine barrier mechanism.
     71     Execution stops until all members arrive at the barrier.
     72 
     73     Implementation Details:
     74     .......................
     75 
     76     When a barrier is forming the master node (first in sort order) in the
     77     set accepts connections from each member of the set.  As they arrive
     78     they indicate the barrier they are joining and their identifier (their
     79     hostname or IP address and optional tag).  They are then asked to wait.
     80     When all members are present the master node then checks that each
     81     member is still responding via a ping/pong exchange.  If this is
     82     successful then everyone has checked in at the barrier.  We then tell
     83     everyone they may continue via a rlse message.
     84 
     85     Where the master is not the first to reach the barrier the client
     86     connects will fail.  Client will retry until they either succeed in
     87     connecting to master or the overall timeout is exceeded.
     88 
     89     As an example here is the exchange for a three node barrier called
     90     'TAG'
     91 
     92       MASTER                        CLIENT1         CLIENT2
     93         <-------------TAG C1-------------
     94         --------------wait-------------->
     95                       [...]
     96         <-------------TAG C2-----------------------------
     97         --------------wait------------------------------>
     98                       [...]
     99         --------------ping-------------->
    100         <-------------pong---------------
    101         --------------ping------------------------------>
    102         <-------------pong-------------------------------
    103                 ----- BARRIER conditions MET -----
    104         --------------rlse-------------->
    105         --------------rlse------------------------------>
    106 
    107     Note that once the last client has responded to pong the barrier is
    108     implicitly deemed satisifed, they have all acknowledged their presence.
    109     If we fail to send any of the rlse messages the barrier is still a
    110     success, the failed host has effectively broken 'right at the beginning'
    111     of the post barrier execution window.
    112 
    113     In addition, there is another rendezvous, that makes each slave a server
    114     and the master a client.  The connection process and usage is still the
    115     same but allows barriers from machines that only have a one-way
    116     connection initiation.  This is called rendezvous_servers.
    117 
    118     For example:
    119         if ME == SERVER:
    120             server start
    121 
    122         b = job.barrier(ME, 'server-up', 120)
    123         b.rendezvous(CLIENT, SERVER)
    124 
    125         if ME == CLIENT:
    126             client run
    127 
    128         b = job.barrier(ME, 'test-complete', 3600)
    129         b.rendezvous(CLIENT, SERVER)
    130 
    131         if ME == SERVER:
    132             server stop
    133 
    134     Any client can also request an abort of the job by setting
    135     abort=True in the rendezvous arguments.
    136     """
    137 
    138     def __init__(self, hostid, tag, timeout=None, port=None,
    139                  listen_server=None):
    140         """
    141         @param hostid: My hostname/IP address + optional tag.
    142         @param tag: Symbolic name of the barrier in progress.
    143         @param timeout: Maximum seconds to wait for a the barrier to meet.
    144         @param port: Port number to listen on.
    145         @param listen_server: External listen_server instance to use instead
    146                 of creating our own.  Create a listen_server instance and
    147                 reuse it across multiple barrier instances so that the
    148                 barrier code doesn't try to quickly re-bind on the same port
    149                 (packets still in transit for the previous barrier they may
    150                 reset new connections).
    151         """
    152         self._hostid = hostid
    153         self._tag = tag
    154         if listen_server:
    155             if port:
    156                 raise error.BarrierError(
    157                         '"port" and "listen_server" are mutually exclusive.')
    158             self._port = listen_server.port
    159         else:
    160             self._port = port or _DEFAULT_PORT
    161         self._server = listen_server  # A listen_server instance or None.
    162         self._members = []  # List of hosts we expect to find at the barrier.
    163         self._timeout_secs = timeout
    164         self._start_time = None  # Timestamp of when we started waiting.
    165         self._masterid = None  # Host/IP + optional tag of selected master.
    166         logging.info("tag=%s port=%d timeout=%r",
    167                      self._tag, self._port, self._timeout_secs)
    168 
    169         # Number of clients seen (should be the length of self._waiting).
    170         self._seen = 0
    171 
    172         # Clients who have checked in and are waiting (if we are a master).
    173         self._waiting = {}  # Maps from hostname -> (client, addr) tuples.
    174 
    175 
    176     def _update_timeout(self, timeout):
    177         if timeout is not None and self._start_time is not None:
    178             self._timeout_secs = (time() - self._start_time) + timeout
    179         else:
    180             self._timeout_secs = timeout
    181 
    182 
    183     def _remaining(self):
    184         if self._timeout_secs is not None and self._start_time is not None:
    185             timeout = self._timeout_secs - (time() - self._start_time)
    186             if timeout <= 0:
    187                 errmsg = "timeout waiting for barrier: %s" % self._tag
    188                 logging.error(error)
    189                 raise error.BarrierError(errmsg)
    190         else:
    191             timeout = self._timeout_secs
    192 
    193         if self._timeout_secs is not None:
    194             logging.info("seconds remaining: %d", timeout)
    195         return timeout
    196 
    197 
    198     def _master_welcome(self, connection):
    199         client, addr = connection
    200         name = None
    201 
    202         client.settimeout(5)
    203         try:
    204             # Get the clients name.
    205             intro = client.recv(1024)
    206             intro = intro.strip("\r\n")
    207 
    208             intro_parts = intro.split(' ', 2)
    209             if len(intro_parts) != 2:
    210                 logging.warning("Ignoring invalid data from %s: %r",
    211                              client.getpeername(), intro)
    212                 client.close()
    213                 return
    214             tag, name = intro_parts
    215 
    216             logging.info("new client tag=%s, name=%s", tag, name)
    217 
    218             # Ok, we know who is trying to attach.  Confirm that
    219             # they are coming to the same meeting.  Also, everyone
    220             # should be using a unique handle (their IP address).
    221             # If we see a duplicate, something _bad_ has happened
    222             # so drop them now.
    223             if self._tag != tag:
    224                 logging.warning("client arriving for the wrong barrier: %s != %s",
    225                              self._tag, tag)
    226                 client.settimeout(5)
    227                 client.send("!tag")
    228                 client.close()
    229                 return
    230             elif name in self._waiting:
    231                 logging.warning("duplicate client")
    232                 client.settimeout(5)
    233                 client.send("!dup")
    234                 client.close()
    235                 return
    236 
    237             # Acknowledge the client
    238             client.send("wait")
    239 
    240         except socket.timeout:
    241             # This is nominally an error, but as we do not know
    242             # who that was we cannot do anything sane other
    243             # than report it and let the normal timeout kill
    244             # us when thats appropriate.
    245             logging.warning("client handshake timeout: (%s:%d)",
    246                          addr[0], addr[1])
    247             client.close()
    248             return
    249 
    250         logging.info("client now waiting: %s (%s:%d)",
    251                      name, addr[0], addr[1])
    252 
    253         # They seem to be valid record them.
    254         self._waiting[name] = connection
    255         self._seen += 1
    256 
    257 
    258     def _slave_hello(self, connection):
    259         (client, addr) = connection
    260         name = None
    261 
    262         client.settimeout(5)
    263         try:
    264             client.send(self._tag + " " + self._hostid)
    265 
    266             reply = client.recv(4)
    267             reply = reply.strip("\r\n")
    268             logging.info("master said: %s", reply)
    269 
    270             # Confirm the master accepted the connection.
    271             if reply != "wait":
    272                 logging.warning("Bad connection request to master")
    273                 client.close()
    274                 return
    275 
    276         except socket.timeout:
    277             # This is nominally an error, but as we do not know
    278             # who that was we cannot do anything sane other
    279             # than report it and let the normal timeout kill
    280             # us when thats appropriate.
    281             logging.error("master handshake timeout: (%s:%d)",
    282                           addr[0], addr[1])
    283             client.close()
    284             return
    285 
    286         logging.info("slave now waiting: (%s:%d)", addr[0], addr[1])
    287 
    288         # They seem to be valid record them.
    289         self._waiting[self._hostid] = connection
    290         self._seen = 1
    291 
    292 
    293     def _master_release(self):
    294         # Check everyone is still there, that they have not
    295         # crashed or disconnected in the meantime.
    296         allpresent = True
    297         abort = self._abort
    298         for name in self._waiting:
    299             (client, addr) = self._waiting[name]
    300 
    301             logging.info("checking client present: %s", name)
    302 
    303             client.settimeout(5)
    304             reply = 'none'
    305             try:
    306                 client.send("ping")
    307                 reply = client.recv(1024)
    308             except socket.timeout:
    309                 logging.warning("ping/pong timeout: %s", name)
    310                 pass
    311 
    312             if reply == 'abrt':
    313                 logging.warning("Client %s requested abort", name)
    314                 abort = True
    315             elif reply != "pong":
    316                 allpresent = False
    317 
    318         if not allpresent:
    319             raise error.BarrierError("master lost client")
    320 
    321         if abort:
    322             logging.info("Aborting the clients")
    323             msg = 'abrt'
    324         else:
    325             logging.info("Releasing clients")
    326             msg = 'rlse'
    327 
    328         # If every ones checks in then commit the release.
    329         for name in self._waiting:
    330             (client, addr) = self._waiting[name]
    331 
    332             client.settimeout(5)
    333             try:
    334                 client.send(msg)
    335             except socket.timeout:
    336                 logging.warning("release timeout: %s", name)
    337                 pass
    338 
    339         if abort:
    340             raise BarrierAbortError("Client requested abort")
    341 
    342 
    343     def _waiting_close(self):
    344         # Either way, close out all the clients.  If we have
    345         # not released them then they know to abort.
    346         for name in self._waiting:
    347             (client, addr) = self._waiting[name]
    348 
    349             logging.info("closing client: %s", name)
    350 
    351             try:
    352                 client.close()
    353             except:
    354                 pass
    355 
    356 
    357     def _run_server(self, is_master):
    358         server = self._server or listen_server(port=self._port)
    359         failed = 0
    360         try:
    361             while True:
    362                 try:
    363                     # Wait for callers welcoming each.
    364                     server.socket.settimeout(self._remaining())
    365                     connection = server.socket.accept()
    366                     if is_master:
    367                         self._master_welcome(connection)
    368                     else:
    369                         self._slave_hello(connection)
    370                 except socket.timeout:
    371                     logging.warning("timeout waiting for remaining clients")
    372                     pass
    373 
    374                 if is_master:
    375                     # Check if everyone is here.
    376                     logging.info("master seen %d of %d",
    377                                  self._seen, len(self._members))
    378                     if self._seen == len(self._members):
    379                         self._master_release()
    380                         break
    381                 else:
    382                     # Check if master connected.
    383                     if self._seen:
    384                         logging.info("slave connected to master")
    385                         self._slave_wait()
    386                         break
    387         finally:
    388             self._waiting_close()
    389             # if we created the listening_server in the beginning of this
    390             # function then close the listening socket here
    391             if not self._server:
    392                 server.close()
    393 
    394 
    395     def _run_client(self, is_master):
    396         while self._remaining() is None or self._remaining() > 0:
    397             try:
    398                 remote = socket.socket(socket.AF_INET,
    399                         socket.SOCK_STREAM)
    400                 remote.settimeout(30)
    401                 if is_master:
    402                     # Connect to all slaves.
    403                     host = get_host_from_id(self._members[self._seen])
    404                     logging.info("calling slave: %s", host)
    405                     connection = (remote, (host, self._port))
    406                     remote.connect(connection[1])
    407                     self._master_welcome(connection)
    408                 else:
    409                     # Just connect to the master.
    410                     host = get_host_from_id(self._masterid)
    411                     logging.info("calling master")
    412                     connection = (remote, (host, self._port))
    413                     remote.connect(connection[1])
    414                     self._slave_hello(connection)
    415             except socket.timeout:
    416                 logging.warning("timeout calling host, retry")
    417                 sleep(10)
    418                 pass
    419             except socket.error, err:
    420                 (code, str) = err
    421                 if (code != errno.ECONNREFUSED):
    422                     raise
    423                 sleep(10)
    424 
    425             if is_master:
    426                 # Check if everyone is here.
    427                 logging.info("master seen %d of %d",
    428                              self._seen, len(self._members))
    429                 if self._seen == len(self._members):
    430                     self._master_release()
    431                     break
    432             else:
    433                 # Check if master connected.
    434                 if self._seen:
    435                     logging.info("slave connected to master")
    436                     self._slave_wait()
    437                     break
    438 
    439         self._waiting_close()
    440 
    441 
    442     def _slave_wait(self):
    443         remote = self._waiting[self._hostid][0]
    444         mode = "wait"
    445         while True:
    446             # All control messages are the same size to allow
    447             # us to split individual messages easily.
    448             remote.settimeout(self._remaining())
    449             reply = remote.recv(4)
    450             if not reply:
    451                 break
    452 
    453             reply = reply.strip("\r\n")
    454             logging.info("master said: %s", reply)
    455 
    456             mode = reply
    457             if reply == "ping":
    458                 # Ensure we have sufficient time for the
    459                 # ping/pong/rlse cyle to complete normally.
    460                 self._update_timeout(10 + 10 * len(self._members))
    461 
    462                 if self._abort:
    463                     msg = "abrt"
    464                 else:
    465                     msg = "pong"
    466                 logging.info(msg)
    467                 remote.settimeout(self._remaining())
    468                 remote.send(msg)
    469 
    470             elif reply == "rlse" or reply == "abrt":
    471                 # Ensure we have sufficient time for the
    472                 # ping/pong/rlse cyle to complete normally.
    473                 self._update_timeout(10 + 10 * len(self._members))
    474 
    475                 logging.info("was released, waiting for close")
    476 
    477         if mode == "rlse":
    478             pass
    479         elif mode == "wait":
    480             raise error.BarrierError("master abort -- barrier timeout")
    481         elif mode == "ping":
    482             raise error.BarrierError("master abort -- client lost")
    483         elif mode == "!tag":
    484             raise error.BarrierError("master abort -- incorrect tag")
    485         elif mode == "!dup":
    486             raise error.BarrierError("master abort -- duplicate client")
    487         elif mode == "abrt":
    488             raise BarrierAbortError("Client requested abort")
    489         else:
    490             raise error.BarrierError("master handshake failure: " + mode)
    491 
    492 
    493     def rendezvous(self, *hosts, **dargs):
    494         # if called with abort=True, this will raise an exception
    495         # on all the clients.
    496         self._start_time = time()
    497         self._members = list(hosts)
    498         self._members.sort()
    499         self._masterid = self._members.pop(0)
    500         self._abort = dargs.get('abort', False)
    501 
    502         logging.info("masterid: %s", self._masterid)
    503         if self._abort:
    504             logging.debug("%s is aborting", self._hostid)
    505         if not len(self._members):
    506             logging.info("No other members listed.")
    507             return
    508         logging.info("members: %s", ",".join(self._members))
    509 
    510         self._seen = 0
    511         self._waiting = {}
    512 
    513         # Figure out who is the master in this barrier.
    514         if self._hostid == self._masterid:
    515             logging.info("selected as master")
    516             self._run_server(is_master=True)
    517         else:
    518             logging.info("selected as slave")
    519             self._run_client(is_master=False)
    520 
    521 
    522     def rendezvous_servers(self, masterid, *hosts, **dargs):
    523         # if called with abort=True, this will raise an exception
    524         # on all the clients.
    525         self._start_time = time()
    526         self._members = list(hosts)
    527         self._members.sort()
    528         self._masterid = masterid
    529         self._abort = dargs.get('abort', False)
    530 
    531         logging.info("masterid: %s", self._masterid)
    532         if not len(self._members):
    533             logging.info("No other members listed.")
    534             return
    535         logging.info("members: %s", ",".join(self._members))
    536 
    537         self._seen = 0
    538         self._waiting = {}
    539 
    540         # Figure out who is the master in this barrier.
    541         if self._hostid == self._masterid:
    542             logging.info("selected as master")
    543             self._run_client(is_master=True)
    544         else:
    545             logging.info("selected as slave")
    546             self._run_server(is_master=False)
    547