Home | History | Annotate | Download | only in common_lib
      1 import sys, socket, errno, logging
      2 from time import time, sleep
      3 from autotest_lib.client.common_lib import error, utils
      4 
      5 # default barrier port
      6 _DEFAULT_PORT = 11922
      7 
      8 def _get_host_from_id(hostid):
      9     # Remove any trailing local identifier following a #.
     10     # This allows multiple members per host which is particularly
     11     # helpful in testing.
     12     if not hostid.startswith('#'):
     13         return hostid.split('#')[0]
     14     else:
     15         raise error.BarrierError(
     16             "Invalid Host id: Host Address should be specified")
     17 
     18 
     19 class BarrierAbortError(error.BarrierError):
     20     """Special BarrierError raised when an explicit abort is requested."""
     21 
     22 
     23 class listen_server(object):
     24     """
     25     Manages a listening socket for barrier.
     26 
     27     Can be used to run multiple barrier instances with the same listening
     28     socket (if they were going to listen on the same port).
     29 
     30     Attributes:
     31 
     32     @attr address: Address to bind to (string).
     33     @attr port: Port to bind to.
     34     @attr socket: Listening socket object.
     35     """
     36     def __init__(self, address='', port=_DEFAULT_PORT):
     37         """
     38         Create a listen_server instance for the given address/port.
     39 
     40         @param address: The address to listen on.
     41         @param port: The port to listen on.
     42         """
     43         self.address = address
     44         self.port = port
     45         # Open the port so that the listening server can accept incoming
     46         # connections.
     47         utils.run('iptables -A INPUT -p tcp -m tcp --dport %d -j ACCEPT' %
     48                   port)
     49         self.socket = self._setup()
     50 
     51 
     52     def _setup(self):
     53         """Create, bind and listen on the listening socket."""
     54         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     55         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
     56         sock.bind((self.address, self.port))
     57         sock.listen(10)
     58 
     59         return sock
     60 
     61 
     62     def close(self):
     63         """Close the listening socket."""
     64         self.socket.close()
     65 
     66 
     67 class barrier(object):
     68     """Multi-machine barrier support.
     69 
     70     Provides multi-machine barrier mechanism.
     71     Execution stops until all members arrive at the barrier.
     72 
     73     Implementation Details:
     74     .......................
     75 
     76     When a barrier is forming the master node (first in sort order) in the
     77     set accepts connections from each member of the set.  As they arrive
     78     they indicate the barrier they are joining and their identifier (their
     79     hostname or IP address and optional tag).  They are then asked to wait.
     80     When all members are present the master node then checks that each
     81     member is still responding via a ping/pong exchange.  If this is
     82     successful then everyone has checked in at the barrier.  We then tell
     83     everyone they may continue via a rlse message.
     84 
     85     Where the master is not the first to reach the barrier the client
     86     connects will fail.  Client will retry until they either succeed in
     87     connecting to master or the overall timeout is exceeded.
     88 
     89     As an example here is the exchange for a three node barrier called
     90     'TAG'
     91 
     92       MASTER                        CLIENT1         CLIENT2
     93         <-------------TAG C1-------------
     94         --------------wait-------------->
     95                       [...]
     96         <-------------TAG C2-----------------------------
     97         --------------wait------------------------------>
     98                       [...]
     99         --------------ping-------------->
    100         <-------------pong---------------
    101         --------------ping------------------------------>
    102         <-------------pong-------------------------------
    103                 ----- BARRIER conditions MET -----
    104         --------------rlse-------------->
    105         --------------rlse------------------------------>
    106 
    107     Note that once the last client has responded to pong the barrier is
    108     implicitly deemed satisifed, they have all acknowledged their presence.
    109     If we fail to send any of the rlse messages the barrier is still a
    110     success, the failed host has effectively broken 'right at the beginning'
    111     of the post barrier execution window.
    112 
    113     In addition, there is another rendezvous, that makes each slave a server
    114     and the master a client.  The connection process and usage is still the
    115     same but allows barriers from machines that only have a one-way
    116     connection initiation.  This is called rendezvous_servers.
    117 
    118     For example:
    119         if ME == SERVER:
    120             server start
    121 
    122         b = job.barrier(ME, 'server-up', 120)
    123         b.rendezvous(CLIENT, SERVER)
    124 
    125         if ME == CLIENT:
    126             client run
    127 
    128         b = job.barrier(ME, 'test-complete', 3600)
    129         b.rendezvous(CLIENT, SERVER)
    130 
    131         if ME == SERVER:
    132             server stop
    133 
    134     Any client can also request an abort of the job by setting
    135     abort=True in the rendezvous arguments.
    136     """
    137 
    138     def __init__(self, hostid, tag, timeout=None, port=None,
    139                  listen_server=None):
    140         """
    141         @param hostid: My hostname/IP address + optional tag.
    142         @param tag: Symbolic name of the barrier in progress.
    143         @param timeout: Maximum seconds to wait for a the barrier to meet.
    144         @param port: Port number to listen on.
    145         @param listen_server: External listen_server instance to use instead
    146                 of creating our own.  Create a listen_server instance and
    147                 reuse it across multiple barrier instances so that the
    148                 barrier code doesn't try to quickly re-bind on the same port
    149                 (packets still in transit for the previous barrier they may
    150                 reset new connections).
    151         """
    152         self._hostid = hostid
    153         self._tag = tag
    154         if listen_server:
    155             if port:
    156                 raise error.BarrierError(
    157                         '"port" and "listen_server" are mutually exclusive.')
    158             self._port = listen_server.port
    159         else:
    160             self._port = port or _DEFAULT_PORT
    161         self._server = listen_server  # A listen_server instance or None.
    162         self._members = []  # List of hosts we expect to find at the barrier.
    163         self._timeout_secs = timeout
    164         self._start_time = None  # Timestamp of when we started waiting.
    165         self._masterid = None  # Host/IP + optional tag of selected master.
    166         logging.info("tag=%s port=%d timeout=%r",
    167                      self._tag, self._port, self._timeout_secs)
    168 
    169         # Number of clients seen (should be the length of self._waiting).
    170         self._seen = 0
    171 
    172         # Clients who have checked in and are waiting (if we are a master).
    173         self._waiting = {}  # Maps from hostname -> (client, addr) tuples.
    174 
    175 
    176     def _update_timeout(self, timeout):
    177         if timeout is not None and self._start_time is not None:
    178             self._timeout_secs = (time() - self._start_time) + timeout
    179         else:
    180             self._timeout_secs = timeout
    181 
    182 
    183     def _remaining(self):
    184         if self._timeout_secs is not None and self._start_time is not None:
    185             timeout = self._timeout_secs - (time() - self._start_time)
    186             if timeout <= 0:
    187                 errmsg = "timeout waiting for barrier: %s" % self._tag
    188                 logging.error(error)
    189                 raise error.BarrierError(errmsg)
    190         else:
    191             timeout = self._timeout_secs
    192 
    193         if self._timeout_secs is not None:
    194             logging.info("seconds remaining: %d", timeout)
    195         return timeout
    196 
    197 
    198     def _master_welcome(self, connection):
    199         client, addr = connection
    200         name = None
    201 
    202         client.settimeout(5)
    203         try:
    204             # Get the clients name.
    205             intro = client.recv(1024)
    206             intro = intro.strip("\r\n")
    207 
    208             intro_parts = intro.split(' ', 2)
    209             if len(intro_parts) != 2:
    210                 logging.warning("Ignoring invalid data from %s: %r",
    211                              client.getpeername(), intro)
    212                 client.close()
    213                 return
    214             tag, name = intro_parts
    215 
    216             logging.info("new client tag=%s, name=%s", tag, name)
    217 
    218             # Ok, we know who is trying to attach.  Confirm that
    219             # they are coming to the same meeting.  Also, everyone
    220             # should be using a unique handle (their IP address).
    221             # If we see a duplicate, something _bad_ has happened
    222             # so drop them now.
    223             if self._tag != tag:
    224                 logging.warning("client arriving for the wrong barrier: %s != %s",
    225                              self._tag, tag)
    226                 client.settimeout(5)
    227                 client.send("!tag")
    228                 client.close()
    229                 return
    230             elif name in self._waiting:
    231                 logging.warning("duplicate client")
    232                 client.settimeout(5)
    233                 client.send("!dup")
    234                 client.close()
    235                 return
    236 
    237             # Acknowledge the client
    238             client.send("wait")
    239 
    240         except socket.timeout:
    241             # This is nominally an error, but as we do not know
    242             # who that was we cannot do anything sane other
    243             # than report it and let the normal timeout kill
    244             # us when thats appropriate.
    245             logging.warning("client handshake timeout: (%s:%d)",
    246                          addr[0], addr[1])
    247             client.close()
    248             return
    249 
    250         logging.info("client now waiting: %s (%s:%d)",
    251                      name, addr[0], addr[1])
    252 
    253         # They seem to be valid record them.
    254         self._waiting[name] = connection
    255         self._seen += 1
    256 
    257 
    258     def _slave_hello(self, connection):
    259         (client, addr) = connection
    260         name = None
    261 
    262         client.settimeout(5)
    263         try:
    264             client.send(self._tag + " " + self._hostid)
    265 
    266             reply = client.recv(4)
    267             reply = reply.strip("\r\n")
    268             logging.info("master said: %s", reply)
    269 
    270             # Confirm the master accepted the connection.
    271             if reply != "wait":
    272                 logging.warning("Bad connection request to master")
    273                 client.close()
    274                 return
    275 
    276         except socket.timeout:
    277             # This is nominally an error, but as we do not know
    278             # who that was we cannot do anything sane other
    279             # than report it and let the normal timeout kill
    280             # us when thats appropriate.
    281             logging.error("master handshake timeout: (%s:%d)",
    282                           addr[0], addr[1])
    283             client.close()
    284             return
    285 
    286         logging.info("slave now waiting: (%s:%d)", addr[0], addr[1])
    287 
    288         # They seem to be valid record them.
    289         self._waiting[self._hostid] = connection
    290         self._seen = 1
    291 
    292 
    293     def _master_release(self):
    294         # Check everyone is still there, that they have not
    295         # crashed or disconnected in the meantime.
    296         allpresent = True
    297         abort = self._abort
    298         for name in self._waiting:
    299             (client, addr) = self._waiting[name]
    300 
    301             logging.info("checking client present: %s", name)
    302 
    303             client.settimeout(5)
    304             reply = 'none'
    305             try:
    306                 client.send("ping")
    307                 reply = client.recv(1024)
    308             except socket.timeout:
    309                 logging.warning("ping/pong timeout: %s", name)
    310                 pass
    311 
    312             if reply == 'abrt':
    313                 logging.warning("Client %s requested abort", name)
    314                 abort = True
    315             elif reply != "pong":
    316                 allpresent = False
    317 
    318         if not allpresent:
    319             raise error.BarrierError("master lost client")
    320 
    321         if abort:
    322             logging.info("Aborting the clients")
    323             msg = 'abrt'
    324         else:
    325             logging.info("Releasing clients")
    326             msg = 'rlse'
    327 
    328         # If every ones checks in then commit the release.
    329         for name in self._waiting:
    330             (client, addr) = self._waiting[name]
    331 
    332             client.settimeout(5)
    333             try:
    334                 client.send(msg)
    335             except socket.timeout:
    336                 logging.warning("release timeout: %s", name)
    337                 pass
    338 
    339         if abort:
    340             raise BarrierAbortError("Client requested abort")
    341 
    342 
    343     def _waiting_close(self):
    344         # Either way, close out all the clients.  If we have
    345         # not released them then they know to abort.
    346         for name in self._waiting:
    347             (client, addr) = self._waiting[name]
    348 
    349             logging.info("closing client: %s", name)
    350 
    351             try:
    352                 client.close()
    353             except:
    354                 pass
    355 
    356 
    357     def _run_server(self, is_master):
    358         server = self._server or listen_server(port=self._port)
    359         failed = 0
    360         try:
    361             while True:
    362                 try:
    363                     # Wait for callers welcoming each.
    364                     server.socket.settimeout(self._remaining())
    365                     connection = server.socket.accept()
    366                     if is_master:
    367                         self._master_welcome(connection)
    368                     else:
    369                         self._slave_hello(connection)
    370                 except socket.timeout:
    371                     logging.warning("timeout waiting for remaining clients")
    372                     pass
    373 
    374                 if is_master:
    375                     # Check if everyone is here.
    376                     logging.info("master seen %d of %d",
    377                                  self._seen, len(self._members))
    378                     if self._seen == len(self._members):
    379                         self._master_release()
    380                         break
    381                 else:
    382                     # Check if master connected.
    383                     if self._seen:
    384                         logging.info("slave connected to master")
    385                         self._slave_wait()
    386                         break
    387         finally:
    388             self._waiting_close()
    389             # if we created the listening_server in the beginning of this
    390             # function then close the listening socket here
    391             if not self._server:
    392                 server.close()
    393 
    394 
    395     def _run_client(self, is_master):
    396         while self._remaining() is None or self._remaining() > 0:
    397             try:
    398                 remote = socket.socket(socket.AF_INET,
    399                         socket.SOCK_STREAM)
    400                 remote.settimeout(30)
    401                 if is_master:
    402                     # Connect to all slaves.
    403                     host = _get_host_from_id(self._members[self._seen])
    404                     logging.info("calling slave: %s", host)
    405                     connection = (remote, (host, self._port))
    406                     remote.connect(connection[1])
    407                     self._master_welcome(connection)
    408                 else:
    409                     # Just connect to the master.
    410                     host = _get_host_from_id(self._masterid)
    411                     logging.info("calling master")
    412                     connection = (remote, (host, self._port))
    413                     remote.connect(connection[1])
    414                     self._slave_hello(connection)
    415             except socket.timeout:
    416                 logging.warning("timeout calling host, retry")
    417                 sleep(10)
    418                 pass
    419             except socket.error, err:
    420                 (code, str) = err
    421                 if (code != errno.ECONNREFUSED and
    422                     code != errno.ETIMEDOUT):
    423                     raise
    424                 sleep(10)
    425 
    426             if is_master:
    427                 # Check if everyone is here.
    428                 logging.info("master seen %d of %d",
    429                              self._seen, len(self._members))
    430                 if self._seen == len(self._members):
    431                     self._master_release()
    432                     break
    433             else:
    434                 # Check if master connected.
    435                 if self._seen:
    436                     logging.info("slave connected to master")
    437                     self._slave_wait()
    438                     break
    439 
    440         self._waiting_close()
    441 
    442 
    443     def _slave_wait(self):
    444         remote = self._waiting[self._hostid][0]
    445         mode = "wait"
    446         while True:
    447             # All control messages are the same size to allow
    448             # us to split individual messages easily.
    449             remote.settimeout(self._remaining())
    450             reply = remote.recv(4)
    451             if not reply:
    452                 break
    453 
    454             reply = reply.strip("\r\n")
    455             logging.info("master said: %s", reply)
    456 
    457             mode = reply
    458             if reply == "ping":
    459                 # Ensure we have sufficient time for the
    460                 # ping/pong/rlse cyle to complete normally.
    461                 self._update_timeout(10 + 10 * len(self._members))
    462 
    463                 if self._abort:
    464                     msg = "abrt"
    465                 else:
    466                     msg = "pong"
    467                 logging.info(msg)
    468                 remote.settimeout(self._remaining())
    469                 remote.send(msg)
    470 
    471             elif reply == "rlse" or reply == "abrt":
    472                 # Ensure we have sufficient time for the
    473                 # ping/pong/rlse cyle to complete normally.
    474                 self._update_timeout(10 + 10 * len(self._members))
    475 
    476                 logging.info("was released, waiting for close")
    477 
    478         if mode == "rlse":
    479             pass
    480         elif mode == "wait":
    481             raise error.BarrierError("master abort -- barrier timeout")
    482         elif mode == "ping":
    483             raise error.BarrierError("master abort -- client lost")
    484         elif mode == "!tag":
    485             raise error.BarrierError("master abort -- incorrect tag")
    486         elif mode == "!dup":
    487             raise error.BarrierError("master abort -- duplicate client")
    488         elif mode == "abrt":
    489             raise BarrierAbortError("Client requested abort")
    490         else:
    491             raise error.BarrierError("master handshake failure: " + mode)
    492 
    493 
    494     def rendezvous(self, *hosts, **dargs):
    495         # if called with abort=True, this will raise an exception
    496         # on all the clients.
    497         self._start_time = time()
    498         self._members = list(hosts)
    499         self._members.sort()
    500         self._masterid = self._members.pop(0)
    501         self._abort = dargs.get('abort', False)
    502 
    503         logging.info("masterid: %s", self._masterid)
    504         if self._abort:
    505             logging.debug("%s is aborting", self._hostid)
    506         if not len(self._members):
    507             logging.info("No other members listed.")
    508             return
    509         logging.info("members: %s", ",".join(self._members))
    510 
    511         self._seen = 0
    512         self._waiting = {}
    513 
    514         # Figure out who is the master in this barrier.
    515         if self._hostid == self._masterid:
    516             logging.info("selected as master")
    517             self._run_server(is_master=True)
    518         else:
    519             logging.info("selected as slave")
    520             self._run_client(is_master=False)
    521 
    522 
    523     def rendezvous_servers(self, masterid, *hosts, **dargs):
    524         # if called with abort=True, this will raise an exception
    525         # on all the clients.
    526         self._start_time = time()
    527         self._members = list(hosts)
    528         self._members.sort()
    529         self._masterid = masterid
    530         self._abort = dargs.get('abort', False)
    531 
    532         logging.info("masterid: %s", self._masterid)
    533         if not len(self._members):
    534             logging.info("No other members listed.")
    535             return
    536         logging.info("members: %s", ",".join(self._members))
    537 
    538         self._seen = 0
    539         self._waiting = {}
    540 
    541         # Figure out who is the master in this barrier.
    542         if self._hostid == self._masterid:
    543             logging.info("selected as master")
    544             self._run_client(is_master=True)
    545         else:
    546             logging.info("selected as slave")
    547             self._run_server(is_master=False)
    548