Home | History | Annotate | Download | only in lucifer
      1 # Copyright 2017 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Job leasing utilities
      6 
      7 Jobs are leased to processes to own and run.  A process owning a job
      8 obtain a job lease.  Ongoing ownership of the lease is established using
      9 an exclusive fcntl lock on the lease file.
     10 
     11 If a lease file is older than a few seconds and is not locked, then its
     12 owning process should be considered crashed.
     13 """
     14 
     15 from __future__ import absolute_import
     16 from __future__ import division
     17 from __future__ import print_function
     18 
     19 import contextlib
     20 import errno
     21 import fcntl
     22 import logging
     23 import os
     24 import socket
     25 import time
     26 
     27 from scandir import scandir
     28 
     29 logger = logging.getLogger(__name__)
     30 
     31 
     32 @contextlib.contextmanager
     33 def obtain_lease(path):
     34     """Return a context manager owning a lease file.
     35 
     36     The process that obtains the lease will maintain an exclusive,
     37     unlimited fcntl lock on the lock file.
     38     """
     39     with open(path, 'w') as f:
     40         fcntl.lockf(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
     41         try:
     42             yield path
     43         finally:
     44             os.unlink(path)
     45 
     46 
     47 def leases_iter(jobdir):
     48     """Yield Lease instances from jobdir.
     49 
     50     @param jobdir: job lease file directory
     51     @returns: iterator of Leases
     52     """
     53     for entry in scandir(jobdir):
     54         if _is_lease_entry(entry):
     55             yield Lease(entry)
     56 
     57 
     58 class Lease(object):
     59     "Represents a job lease."
     60 
     61     # Seconds after a lease file's mtime where its owning process is not
     62     # considered dead.
     63     _FRESH_LIMIT = 5
     64 
     65     def __init__(self, entry):
     66         """Initialize instance.
     67 
     68         @param entry: scandir.DirEntry instance
     69         """
     70         self._entry = entry
     71 
     72     @property
     73     def id(self):
     74         """Return id of leased job."""
     75         return int(self._entry.name)
     76 
     77     def expired(self):
     78         """Return True if the lease is expired.
     79 
     80         A lease is considered expired if there is no fcntl lock on it
     81         and the grace period for the owning process to obtain the lock
     82         has passed.  The lease is not considered expired if the owning
     83         process removed the lock file normally, as an expired lease
     84         indicates that some error has occurred and clean up operations
     85         are needed.
     86         """
     87         try:
     88             stat_result = self._entry.stat()
     89         except OSError as e:  # pragma: no cover
     90             if e.errno == errno.ENOENT:
     91                 return False
     92             raise
     93         mtime = stat_result.st_mtime_ns / (10 ** 9)
     94         if time.time() - mtime < self._FRESH_LIMIT:
     95             return False
     96         return not _fcntl_locked(self._entry.path)
     97 
     98     def cleanup(self):
     99         """Remove the lease file.
    100 
    101         This does not need to be called normally, as the owning process
    102         should clean up its files.
    103         """
    104         try:
    105             os.unlink(self._entry.path)
    106         except OSError as e:
    107             logger.warning('Error removing %s: %s', self._entry.path, e)
    108         try:
    109             os.unlink(self._sock_path)
    110         except OSError as e:
    111             # This is fine; it means that job_reporter crashed, but
    112             # lucifer was able to run its cleanup.
    113             logger.debug('Error removing %s: %s', self._sock_path, e)
    114 
    115     def abort(self):
    116         """Abort the job.
    117 
    118         This sends a datagram to the abort socket associated with the
    119         lease.
    120 
    121         If the socket is closed, either the connect() call or the send()
    122         call will raise socket.error with ECONNREFUSED.
    123         """
    124         sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
    125         sock.setblocking(0)
    126         logger.debug('Connecting to abort socket %s', self._sock_path)
    127         sock.connect(self._sock_path)
    128         logger.debug('Sending abort to %s', self._sock_path)
    129         # The value sent does not matter.
    130         sent = sock.send('abort')
    131         # TODO(ayatane): I don't know if it is possible for sent to be 0
    132         assert sent > 0
    133 
    134     def maybe_abort(self):
    135         """Abort the job, ignoring errors."""
    136         try:
    137             self.abort()
    138         except socket.error as e:
    139             logger.debug('Error aborting socket: %s', e)
    140 
    141     @property
    142     def _sock_path(self):
    143         """Return the path of the abort socket corresponding to the lease."""
    144         return self._entry.path + ".sock"
    145 
    146 
    147 def _is_lease_entry(entry):
    148     """Return True if the DirEntry is for a lease."""
    149     return entry.name.isdigit()
    150 
    151 
    152 def _fcntl_locked(path):
    153     """Return True if a file is fcntl locked.
    154 
    155     @param path: path to file
    156     """
    157     try:
    158         fd = os.open(path, os.O_WRONLY)
    159     except (IOError, OSError):
    160         return False
    161     try:
    162         fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
    163     except IOError:
    164         return True
    165     else:
    166         return False
    167     finally:
    168         os.close(fd)
    169