Home | History | Annotate | Download | only in lucifer
      1 # Copyright 2017 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Job leasing utilities
      6 
      7 See infra/lucifer for the implementation of job leasing.
      8 
      9 https://chromium.googlesource.com/chromiumos/infra/lucifer
     10 
     11 Jobs are leased to processes to own and run.  A process owning a job
     12 obtain a job lease.  Ongoing ownership of the lease is established using
     13 an exclusive fcntl lock on the lease file.
     14 
     15 If a lease file is older than a few seconds and is not locked, then its
     16 owning process should be considered crashed.
     17 """
     18 
     19 from __future__ import absolute_import
     20 from __future__ import division
     21 from __future__ import print_function
     22 
     23 import contextlib
     24 import errno
     25 import fcntl
     26 import logging
     27 import os
     28 import socket
     29 import time
     30 
     31 from scandir import scandir
     32 
     33 logger = logging.getLogger(__name__)
     34 
     35 
     36 @contextlib.contextmanager
     37 def obtain_lease(path):
     38     """Return a context manager owning a lease file.
     39 
     40     The process that obtains the lease will maintain an exclusive,
     41     unlimited fcntl lock on the lock file.
     42     """
     43     with open(path, 'w') as f:
     44         fcntl.lockf(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
     45         try:
     46             yield path
     47         finally:
     48             os.unlink(path)
     49 
     50 
     51 def leases_iter(jobdir):
     52     """Yield Lease instances from jobdir.
     53 
     54     @param jobdir: job lease file directory
     55     @returns: iterator of Leases
     56     """
     57     for entry in scandir(jobdir):
     58         if _is_lease_entry(entry):
     59             yield Lease(entry)
     60 
     61 
     62 class Lease(object):
     63     "Represents a job lease."
     64 
     65     # Seconds after a lease file's mtime where its owning process is not
     66     # considered dead.
     67     _FRESH_LIMIT = 5
     68 
     69     def __init__(self, entry):
     70         """Initialize instance.
     71 
     72         @param entry: scandir.DirEntry instance
     73         """
     74         self._entry = entry
     75 
     76     @property
     77     def id(self):
     78         """Return id of leased job."""
     79         return int(self._entry.name)
     80 
     81     def expired(self):
     82         """Return True if the lease is expired.
     83 
     84         A lease is considered expired if there is no fcntl lock on it
     85         and the grace period for the owning process to obtain the lock
     86         has passed.  The lease is not considered expired if the owning
     87         process removed the lock file normally, as an expired lease
     88         indicates that some error has occurred and clean up operations
     89         are needed.
     90         """
     91         try:
     92             stat_result = self._entry.stat()
     93         except OSError as e:  # pragma: no cover
     94             if e.errno == errno.ENOENT:
     95                 return False
     96             raise
     97         mtime = stat_result.st_mtime_ns / (10 ** 9)
     98         if time.time() - mtime < self._FRESH_LIMIT:
     99             return False
    100         return not _fcntl_locked(self._entry.path)
    101 
    102     def cleanup(self):
    103         """Remove the lease file.
    104 
    105         This does not need to be called normally, as the owning process
    106         should clean up its files.
    107         """
    108         try:
    109             os.unlink(self._entry.path)
    110         except OSError as e:
    111             logger.warning('Error removing %s: %s', self._entry.path, e)
    112         try:
    113             os.unlink(self._sock_path)
    114         except OSError as e:
    115             # This is fine; it means that job_reporter crashed, but
    116             # lucifer_run_job was able to run its cleanup.
    117             logger.debug('Error removing %s: %s', self._sock_path, e)
    118 
    119     def abort(self):
    120         """Abort the job.
    121 
    122         This sends a datagram to the abort socket associated with the
    123         lease.
    124 
    125         If the socket is closed, either the connect() call or the send()
    126         call will raise socket.error with ECONNREFUSED.
    127         """
    128         sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
    129         logger.debug('Connecting to abort socket %s', self._sock_path)
    130         sock.connect(self._sock_path)
    131         logger.debug('Sending abort to %s', self._sock_path)
    132         # The value sent does not matter.
    133         sent = sock.send('abort')
    134         # TODO(ayatane): I don't know if it is possible for sent to be 0
    135         assert sent > 0
    136 
    137     def maybe_abort(self):
    138         """Abort the job, ignoring errors."""
    139         try:
    140             self.abort()
    141         except socket.error as e:
    142             logger.debug('Error aborting socket: %s', e)
    143 
    144     @property
    145     def _sock_path(self):
    146         """Return the path of the abort socket corresponding to the lease."""
    147         return self._entry.path + ".sock"
    148 
    149 
    150 def _is_lease_entry(entry):
    151     """Return True if the DirEntry is for a lease."""
    152     return entry.name.isdigit()
    153 
    154 
    155 def _fcntl_locked(path):
    156     """Return True if a file is fcntl locked.
    157 
    158     @param path: path to file
    159     """
    160     fd = os.open(path, os.O_WRONLY)
    161     try:
    162         fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
    163     except IOError:
    164         return True
    165     else:
    166         return False
    167     finally:
    168         os.close(fd)
    169