Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import dbus, gobject, logging, os, stat
      6 from dbus.mainloop.glib import DBusGMainLoop
      7 
      8 import common
      9 from autotest_lib.client.bin import utils
     10 from autotest_lib.client.common_lib import autotemp, error
     11 from mainloop import ExceptionForward
     12 from mainloop import GenericTesterMainLoop
     13 
     14 
     15 """This module contains several helper classes for writing tests to verify the
     16 CrosDisks DBus interface. In particular, the CrosDisksTester class can be used
     17 to derive functional tests that interact with the CrosDisks server over DBus.
     18 """
     19 
     20 
     21 class ExceptionSuppressor(object):
     22     """A context manager class for suppressing certain types of exception.
     23 
     24     An instance of this class is expected to be used with the with statement
     25     and takes a set of exception classes at instantiation, which are types of
     26     exception to be suppressed (and logged) in the code block under the with
     27     statement.
     28 
     29     Example:
     30 
     31         with ExceptionSuppressor(OSError, IOError):
     32             # An exception, which is a sub-class of OSError or IOError, is
     33             # suppressed in the block code under the with statement.
     34     """
     35     def __init__(self, *args):
     36         self.__suppressed_exc_types = (args)
     37 
     38     def __enter__(self):
     39         return self
     40 
     41     def __exit__(self, exc_type, exc_value, traceback):
     42         if exc_type and issubclass(exc_type, self.__suppressed_exc_types):
     43             try:
     44                 logging.exception('Suppressed exception: %s(%s)',
     45                                   exc_type, exc_value)
     46             except Exception:
     47                 pass
     48             return True
     49         return False
     50 
     51 
     52 class DBusClient(object):
     53     """ A base class of a DBus proxy client to test a DBus server.
     54 
     55     This class is expected to be used along with a GLib main loop and provides
     56     some convenient functions for testing the DBus API exposed by a DBus server.
     57     """
     58     def __init__(self, main_loop, bus, bus_name, object_path):
     59         """Initializes the instance.
     60 
     61         Args:
     62             main_loop: The GLib main loop.
     63             bus: The bus where the DBus server is connected to.
     64             bus_name: The bus name owned by the DBus server.
     65             object_path: The object path of the DBus server.
     66         """
     67         self.__signal_content = {}
     68         self.main_loop = main_loop
     69         self.signal_timeout_in_seconds = 10
     70         logging.debug('Getting D-Bus proxy object on bus "%s" and path "%s"',
     71                       bus_name, object_path)
     72         self.proxy_object = bus.get_object(bus_name, object_path)
     73 
     74     def clear_signal_content(self, signal_name):
     75         """Clears the content of the signal.
     76 
     77         Args:
     78             signal_name: The name of the signal.
     79         """
     80         if signal_name in self.__signal_content:
     81             self.__signal_content[signal_name] = None
     82 
     83     def get_signal_content(self, signal_name):
     84         """Gets the content of a signal.
     85 
     86         Args:
     87             signal_name: The name of the signal.
     88 
     89         Returns:
     90             The content of a signal or None if the signal is not being handled.
     91         """
     92         return self.__signal_content.get(signal_name)
     93 
     94     def handle_signal(self, interface, signal_name, argument_names=()):
     95         """Registers a signal handler to handle a given signal.
     96 
     97         Args:
     98             interface: The DBus interface of the signal.
     99             signal_name: The name of the signal.
    100             argument_names: A list of argument names that the signal contains.
    101         """
    102         if signal_name in self.__signal_content:
    103             return
    104 
    105         self.__signal_content[signal_name] = None
    106 
    107         def signal_handler(*args):
    108             self.__signal_content[signal_name] = dict(zip(argument_names, args))
    109 
    110         logging.debug('Handling D-Bus signal "%s(%s)" on interface "%s"',
    111                       signal_name, ', '.join(argument_names), interface)
    112         self.proxy_object.connect_to_signal(signal_name, signal_handler,
    113                                             interface)
    114 
    115     def wait_for_signal(self, signal_name):
    116         """Waits for the reception of a signal.
    117 
    118         Args:
    119             signal_name: The name of the signal to wait for.
    120 
    121         Returns:
    122             The content of the signal.
    123         """
    124         if signal_name not in self.__signal_content:
    125             return None
    126 
    127         def check_signal_content():
    128             context = self.main_loop.get_context()
    129             while context.iteration(False):
    130                 pass
    131             return self.__signal_content[signal_name] is not None
    132 
    133         logging.debug('Waiting for D-Bus signal "%s"', signal_name)
    134         utils.poll_for_condition(condition=check_signal_content,
    135                                  desc='%s signal' % signal_name,
    136                                  timeout=self.signal_timeout_in_seconds)
    137         content = self.__signal_content[signal_name]
    138         logging.debug('Received D-Bus signal "%s(%s)"', signal_name, content)
    139         self.__signal_content[signal_name] = None
    140         return content
    141 
    142     def expect_signal(self, signal_name, expected_content):
    143         """Waits the the reception of a signal and verifies its content.
    144 
    145         Args:
    146             signal_name: The name of the signal to wait for.
    147             expected_content: The expected content of the signal, which can be
    148                               partially specified. Only specified fields are
    149                               compared between the actual and expected content.
    150 
    151         Returns:
    152             The actual content of the signal.
    153 
    154         Raises:
    155             error.TestFail: A test failure when there is a mismatch between the
    156                             actual and expected content of the signal.
    157         """
    158         actual_content = self.wait_for_signal(signal_name)
    159         logging.debug("%s signal: expected=%s actual=%s",
    160                       signal_name, expected_content, actual_content)
    161         for argument, expected_value in expected_content.iteritems():
    162             if argument not in actual_content:
    163                 raise error.TestFail(
    164                     ('%s signal missing "%s": expected=%s, actual=%s') %
    165                     (signal_name, argument, expected_content, actual_content))
    166 
    167             if actual_content[argument] != expected_value:
    168                 raise error.TestFail(
    169                     ('%s signal not matched on "%s": expected=%s, actual=%s') %
    170                     (signal_name, argument, expected_content, actual_content))
    171         return actual_content
    172 
    173 
    174 class CrosDisksClient(DBusClient):
    175     """A DBus proxy client for testing the CrosDisks DBus server.
    176     """
    177 
    178     CROS_DISKS_BUS_NAME = 'org.chromium.CrosDisks'
    179     CROS_DISKS_INTERFACE = 'org.chromium.CrosDisks'
    180     CROS_DISKS_OBJECT_PATH = '/org/chromium/CrosDisks'
    181     DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties'
    182     FORMAT_COMPLETED_SIGNAL = 'FormatCompleted'
    183     FORMAT_COMPLETED_SIGNAL_ARGUMENTS = (
    184         'status', 'path'
    185     )
    186     MOUNT_COMPLETED_SIGNAL = 'MountCompleted'
    187     MOUNT_COMPLETED_SIGNAL_ARGUMENTS = (
    188         'status', 'source_path', 'source_type', 'mount_path'
    189     )
    190     RENAME_COMPLETED_SIGNAL = 'RenameCompleted'
    191     RENAME_COMPLETED_SIGNAL_ARGUMENTS = (
    192         'status', 'path'
    193     )
    194 
    195     def __init__(self, main_loop, bus):
    196         """Initializes the instance.
    197 
    198         Args:
    199             main_loop: The GLib main loop.
    200             bus: The bus where the DBus server is connected to.
    201         """
    202         super(CrosDisksClient, self).__init__(main_loop, bus,
    203                                               self.CROS_DISKS_BUS_NAME,
    204                                               self.CROS_DISKS_OBJECT_PATH)
    205         self.interface = dbus.Interface(self.proxy_object,
    206                                         self.CROS_DISKS_INTERFACE)
    207         self.properties = dbus.Interface(self.proxy_object,
    208                                          self.DBUS_PROPERTIES_INTERFACE)
    209         self.handle_signal(self.CROS_DISKS_INTERFACE,
    210                            self.FORMAT_COMPLETED_SIGNAL,
    211                            self.FORMAT_COMPLETED_SIGNAL_ARGUMENTS)
    212         self.handle_signal(self.CROS_DISKS_INTERFACE,
    213                            self.MOUNT_COMPLETED_SIGNAL,
    214                            self.MOUNT_COMPLETED_SIGNAL_ARGUMENTS)
    215         self.handle_signal(self.CROS_DISKS_INTERFACE,
    216                            self.RENAME_COMPLETED_SIGNAL,
    217                            self.RENAME_COMPLETED_SIGNAL_ARGUMENTS)
    218 
    219     def enumerate_auto_mountable_devices(self):
    220         """Invokes the CrosDisks EnumerateAutoMountableDevices method.
    221 
    222         Returns:
    223             A list of sysfs paths of devices that are auto-mountable by
    224             CrosDisks.
    225         """
    226         return self.interface.EnumerateAutoMountableDevices()
    227 
    228     def enumerate_devices(self):
    229         """Invokes the CrosDisks EnumerateMountableDevices method.
    230 
    231         Returns:
    232             A list of sysfs paths of devices that are recognized by
    233             CrosDisks.
    234         """
    235         return self.interface.EnumerateDevices()
    236 
    237     def get_device_properties(self, path):
    238         """Invokes the CrosDisks GetDeviceProperties method.
    239 
    240         Args:
    241             path: The device path.
    242 
    243         Returns:
    244             The properties of the device in a dictionary.
    245         """
    246         return self.interface.GetDeviceProperties(path)
    247 
    248     def format(self, path, filesystem_type=None, options=None):
    249         """Invokes the CrosDisks Format method.
    250 
    251         Args:
    252             path: The device path to format.
    253             filesystem_type: The filesystem type used for formatting the device.
    254             options: A list of options used for formatting the device.
    255         """
    256         if filesystem_type is None:
    257             filesystem_type = ''
    258         if options is None:
    259             options = []
    260         self.clear_signal_content(self.FORMAT_COMPLETED_SIGNAL)
    261         self.interface.Format(path, filesystem_type,
    262                               dbus.Array(options, signature='s'))
    263 
    264     def wait_for_format_completion(self):
    265         """Waits for the CrosDisks FormatCompleted signal.
    266 
    267         Returns:
    268             The content of the FormatCompleted signal.
    269         """
    270         return self.wait_for_signal(self.FORMAT_COMPLETED_SIGNAL)
    271 
    272     def expect_format_completion(self, expected_content):
    273         """Waits and verifies for the CrosDisks FormatCompleted signal.
    274 
    275         Args:
    276             expected_content: The expected content of the FormatCompleted
    277                               signal, which can be partially specified.
    278                               Only specified fields are compared between the
    279                               actual and expected content.
    280 
    281         Returns:
    282             The actual content of the FormatCompleted signal.
    283 
    284         Raises:
    285             error.TestFail: A test failure when there is a mismatch between the
    286                             actual and expected content of the FormatCompleted
    287                             signal.
    288         """
    289         return self.expect_signal(self.FORMAT_COMPLETED_SIGNAL,
    290                                   expected_content)
    291 
    292     def rename(self, path, volume_name=None):
    293         """Invokes the CrosDisks Rename method.
    294 
    295         Args:
    296             path: The device path to rename.
    297             volume_name: The new name used for renaming.
    298         """
    299         if volume_name is None:
    300             volume_name = ''
    301         self.clear_signal_content(self.RENAME_COMPLETED_SIGNAL)
    302         self.interface.Rename(path, volume_name)
    303 
    304     def wait_for_rename_completion(self):
    305         """Waits for the CrosDisks RenameCompleted signal.
    306 
    307         Returns:
    308             The content of the RenameCompleted signal.
    309         """
    310         return self.wait_for_signal(self.RENAME_COMPLETED_SIGNAL)
    311 
    312     def expect_rename_completion(self, expected_content):
    313         """Waits and verifies for the CrosDisks RenameCompleted signal.
    314 
    315         Args:
    316             expected_content: The expected content of the RenameCompleted
    317                               signal, which can be partially specified.
    318                               Only specified fields are compared between the
    319                               actual and expected content.
    320 
    321         Returns:
    322             The actual content of the RenameCompleted signal.
    323 
    324         Raises:
    325             error.TestFail: A test failure when there is a mismatch between the
    326                             actual and expected content of the RenameCompleted
    327                             signal.
    328         """
    329         return self.expect_signal(self.RENAME_COMPLETED_SIGNAL,
    330                                   expected_content)
    331 
    332     def mount(self, path, filesystem_type=None, options=None):
    333         """Invokes the CrosDisks Mount method.
    334 
    335         Args:
    336             path: The device path to mount.
    337             filesystem_type: The filesystem type used for mounting the device.
    338             options: A list of options used for mounting the device.
    339         """
    340         if filesystem_type is None:
    341             filesystem_type = ''
    342         if options is None:
    343             options = []
    344         self.clear_signal_content(self.MOUNT_COMPLETED_SIGNAL)
    345         self.interface.Mount(path, filesystem_type,
    346                              dbus.Array(options, signature='s'))
    347 
    348     def unmount(self, path, options=None):
    349         """Invokes the CrosDisks Unmount method.
    350 
    351         Args:
    352             path: The device or mount path to unmount.
    353             options: A list of options used for unmounting the path.
    354         """
    355         if options is None:
    356             options = []
    357         self.interface.Unmount(path, dbus.Array(options, signature='s'))
    358 
    359     def wait_for_mount_completion(self):
    360         """Waits for the CrosDisks MountCompleted signal.
    361 
    362         Returns:
    363             The content of the MountCompleted signal.
    364         """
    365         return self.wait_for_signal(self.MOUNT_COMPLETED_SIGNAL)
    366 
    367     def expect_mount_completion(self, expected_content):
    368         """Waits and verifies for the CrosDisks MountCompleted signal.
    369 
    370         Args:
    371             expected_content: The expected content of the MountCompleted
    372                               signal, which can be partially specified.
    373                               Only specified fields are compared between the
    374                               actual and expected content.
    375 
    376         Returns:
    377             The actual content of the MountCompleted signal.
    378 
    379         Raises:
    380             error.TestFail: A test failure when there is a mismatch between the
    381                             actual and expected content of the MountCompleted
    382                             signal.
    383         """
    384         return self.expect_signal(self.MOUNT_COMPLETED_SIGNAL,
    385                                   expected_content)
    386 
    387 
    388 class CrosDisksTester(GenericTesterMainLoop):
    389     """A base tester class for testing the CrosDisks server.
    390 
    391     A derived class should override the get_tests method to return a list of
    392     test methods. The perform_one_test method invokes each test method in the
    393     list to verify some functionalities of CrosDisks server.
    394     """
    395     def __init__(self, test):
    396         bus_loop = DBusGMainLoop(set_as_default=True)
    397         bus = dbus.SystemBus(mainloop=bus_loop)
    398         self.main_loop = gobject.MainLoop()
    399         super(CrosDisksTester, self).__init__(test, self.main_loop)
    400         self.cros_disks = CrosDisksClient(self.main_loop, bus)
    401 
    402     def get_tests(self):
    403         """Returns a list of test methods to be invoked by perform_one_test.
    404 
    405         A derived class should override this method.
    406 
    407         Returns:
    408             A list of test methods.
    409         """
    410         return []
    411 
    412     @ExceptionForward
    413     def perform_one_test(self):
    414         """Exercises each test method in the list returned by get_tests.
    415         """
    416         tests = self.get_tests()
    417         self.remaining_requirements = set([test.func_name for test in tests])
    418         for test in tests:
    419             test()
    420             self.requirement_completed(test.func_name)
    421 
    422 
    423 class FilesystemTestObject(object):
    424     """A base class to represent a filesystem test object.
    425 
    426     A filesystem test object can be a file, directory or symbolic link.
    427     A derived class should override the _create and _verify method to implement
    428     how the test object should be created and verified, respectively, on a
    429     filesystem.
    430     """
    431     def __init__(self, path, content, mode):
    432         """Initializes the instance.
    433 
    434         Args:
    435             path: The relative path of the test object.
    436             content: The content of the test object.
    437             mode: The file permissions given to the test object.
    438         """
    439         self._path = path
    440         self._content = content
    441         self._mode = mode
    442 
    443     def create(self, base_dir):
    444         """Creates the test object in a base directory.
    445 
    446         Args:
    447             base_dir: The base directory where the test object is created.
    448 
    449         Returns:
    450             True if the test object is created successfully or False otherwise.
    451         """
    452         if not self._create(base_dir):
    453             logging.debug('Failed to create filesystem test object at "%s"',
    454                           os.path.join(base_dir, self._path))
    455             return False
    456         return True
    457 
    458     def verify(self, base_dir):
    459         """Verifies the test object in a base directory.
    460 
    461         Args:
    462             base_dir: The base directory where the test object is expected to be
    463                       found.
    464 
    465         Returns:
    466             True if the test object is found in the base directory and matches
    467             the expected content, or False otherwise.
    468         """
    469         if not self._verify(base_dir):
    470             logging.debug('Failed to verify filesystem test object at "%s"',
    471                           os.path.join(base_dir, self._path))
    472             return False
    473         return True
    474 
    475     def _create(self, base_dir):
    476         return False
    477 
    478     def _verify(self, base_dir):
    479         return False
    480 
    481 
    482 class FilesystemTestDirectory(FilesystemTestObject):
    483     """A filesystem test object that represents a directory."""
    484 
    485     def __init__(self, path, content, mode=stat.S_IRWXU|stat.S_IRGRP| \
    486                  stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH):
    487         super(FilesystemTestDirectory, self).__init__(path, content, mode)
    488 
    489     def _create(self, base_dir):
    490         path = os.path.join(base_dir, self._path) if self._path else base_dir
    491 
    492         if self._path:
    493             with ExceptionSuppressor(OSError):
    494                 os.makedirs(path)
    495                 os.chmod(path, self._mode)
    496 
    497         if not os.path.isdir(path):
    498             return False
    499 
    500         for content in self._content:
    501             if not content.create(path):
    502                 return False
    503         return True
    504 
    505     def _verify(self, base_dir):
    506         path = os.path.join(base_dir, self._path) if self._path else base_dir
    507         if not os.path.isdir(path):
    508             return False
    509 
    510         for content in self._content:
    511             if not content.verify(path):
    512                 return False
    513         return True
    514 
    515 
    516 class FilesystemTestFile(FilesystemTestObject):
    517     """A filesystem test object that represents a file."""
    518 
    519     def __init__(self, path, content, mode=stat.S_IRUSR|stat.S_IWUSR| \
    520                  stat.S_IRGRP|stat.S_IROTH):
    521         super(FilesystemTestFile, self).__init__(path, content, mode)
    522 
    523     def _create(self, base_dir):
    524         path = os.path.join(base_dir, self._path)
    525         with ExceptionSuppressor(IOError):
    526             with open(path, 'wb+') as f:
    527                 f.write(self._content)
    528             with ExceptionSuppressor(OSError):
    529                 os.chmod(path, self._mode)
    530             return True
    531         return False
    532 
    533     def _verify(self, base_dir):
    534         path = os.path.join(base_dir, self._path)
    535         with ExceptionSuppressor(IOError):
    536             with open(path, 'rb') as f:
    537                 return f.read() == self._content
    538         return False
    539 
    540 
    541 class DefaultFilesystemTestContent(FilesystemTestDirectory):
    542     def __init__(self):
    543         super(DefaultFilesystemTestContent, self).__init__('', [
    544             FilesystemTestFile('file1', '0123456789'),
    545             FilesystemTestDirectory('dir1', [
    546                 FilesystemTestFile('file1', ''),
    547                 FilesystemTestFile('file2', 'abcdefg'),
    548                 FilesystemTestDirectory('dir2', [
    549                     FilesystemTestFile('file3', 'abcdefg'),
    550                 ]),
    551             ]),
    552         ], stat.S_IRWXU|stat.S_IRGRP|stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH)
    553 
    554 
    555 class VirtualFilesystemImage(object):
    556     def __init__(self, block_size, block_count, filesystem_type,
    557                  *args, **kwargs):
    558         """Initializes the instance.
    559 
    560         Args:
    561             block_size: The number of bytes of each block in the image.
    562             block_count: The number of blocks in the image.
    563             filesystem_type: The filesystem type to be given to the mkfs
    564                              program for formatting the image.
    565 
    566         Keyword Args:
    567             mount_filesystem_type: The filesystem type to be given to the
    568                                    mount program for mounting the image.
    569             mkfs_options: A list of options to be given to the mkfs program.
    570         """
    571         self._block_size = block_size
    572         self._block_count = block_count
    573         self._filesystem_type = filesystem_type
    574         self._mount_filesystem_type = kwargs.get('mount_filesystem_type')
    575         if self._mount_filesystem_type is None:
    576             self._mount_filesystem_type = filesystem_type
    577         self._mkfs_options = kwargs.get('mkfs_options')
    578         if self._mkfs_options is None:
    579             self._mkfs_options = []
    580         self._image_file = None
    581         self._loop_device = None
    582         self._loop_device_stat = None
    583         self._mount_dir = None
    584 
    585     def __del__(self):
    586         with ExceptionSuppressor(Exception):
    587             self.clean()
    588 
    589     def __enter__(self):
    590         self.create()
    591         return self
    592 
    593     def __exit__(self, exc_type, exc_value, traceback):
    594         self.clean()
    595         return False
    596 
    597     def _remove_temp_path(self, temp_path):
    598         """Removes a temporary file or directory created using autotemp."""
    599         if temp_path:
    600             with ExceptionSuppressor(Exception):
    601                 path = temp_path.name
    602                 temp_path.clean()
    603                 logging.debug('Removed "%s"', path)
    604 
    605     def _remove_image_file(self):
    606         """Removes the image file if one has been created."""
    607         self._remove_temp_path(self._image_file)
    608         self._image_file = None
    609 
    610     def _remove_mount_dir(self):
    611         """Removes the mount directory if one has been created."""
    612         self._remove_temp_path(self._mount_dir)
    613         self._mount_dir = None
    614 
    615     @property
    616     def image_file(self):
    617         """Gets the path of the image file.
    618 
    619         Returns:
    620             The path of the image file or None if no image file has been
    621             created.
    622         """
    623         return self._image_file.name if self._image_file else None
    624 
    625     @property
    626     def loop_device(self):
    627         """Gets the loop device where the image file is attached to.
    628 
    629         Returns:
    630             The path of the loop device where the image file is attached to or
    631             None if no loop device is attaching the image file.
    632         """
    633         return self._loop_device
    634 
    635     @property
    636     def mount_dir(self):
    637         """Gets the directory where the image file is mounted to.
    638 
    639         Returns:
    640             The directory where the image file is mounted to or None if no
    641             mount directory has been created.
    642         """
    643         return self._mount_dir.name if self._mount_dir else None
    644 
    645     def create(self):
    646         """Creates a zero-filled image file with the specified size.
    647 
    648         The created image file is temporary and removed when clean()
    649         is called.
    650         """
    651         self.clean()
    652         self._image_file = autotemp.tempfile(unique_id='fsImage')
    653         try:
    654             logging.debug('Creating zero-filled image file at "%s"',
    655                           self._image_file.name)
    656             utils.run('dd if=/dev/zero of=%s bs=%s count=%s' %
    657                       (self._image_file.name, self._block_size,
    658                        self._block_count))
    659         except error.CmdError as exc:
    660             self._remove_image_file()
    661             message = 'Failed to create filesystem image: %s' % exc
    662             raise RuntimeError(message)
    663 
    664     def clean(self):
    665         """Removes the image file if one has been created.
    666 
    667         Before removal, the image file is detached from the loop device that
    668         it is attached to.
    669         """
    670         self.detach_from_loop_device()
    671         self._remove_image_file()
    672 
    673     def attach_to_loop_device(self):
    674         """Attaches the created image file to a loop device.
    675 
    676         Creates the image file, if one has not been created, by calling
    677         create().
    678 
    679         Returns:
    680             The path of the loop device where the image file is attached to.
    681         """
    682         if self._loop_device:
    683             return self._loop_device
    684 
    685         if not self._image_file:
    686             self.create()
    687 
    688         logging.debug('Attaching image file "%s" to loop device',
    689                       self._image_file.name)
    690         utils.run('losetup -f %s' % self._image_file.name)
    691         output = utils.system_output('losetup -j %s' % self._image_file.name)
    692         # output should look like: "/dev/loop0: [000d]:6329 (/tmp/test.img)"
    693         self._loop_device = output.split(':')[0]
    694         logging.debug('Attached image file "%s" to loop device "%s"',
    695                       self._image_file.name, self._loop_device)
    696 
    697         self._loop_device_stat = os.stat(self._loop_device)
    698         logging.debug('Loop device "%s" (uid=%d, gid=%d, permissions=%04o)',
    699                       self._loop_device,
    700                       self._loop_device_stat.st_uid,
    701                       self._loop_device_stat.st_gid,
    702                       stat.S_IMODE(self._loop_device_stat.st_mode))
    703         return self._loop_device
    704 
    705     def detach_from_loop_device(self):
    706         """Detaches the image file from the loop device."""
    707         if not self._loop_device:
    708             return
    709 
    710         self.unmount()
    711 
    712         logging.debug('Cleaning up remaining mount points of loop device "%s"',
    713                       self._loop_device)
    714         utils.run('umount -f %s' % self._loop_device, ignore_status=True)
    715 
    716         logging.debug('Restore ownership/permissions of loop device "%s"',
    717                       self._loop_device)
    718         os.chmod(self._loop_device,
    719                  stat.S_IMODE(self._loop_device_stat.st_mode))
    720         os.chown(self._loop_device,
    721                  self._loop_device_stat.st_uid, self._loop_device_stat.st_gid)
    722 
    723         logging.debug('Detaching image file "%s" from loop device "%s"',
    724                       self._image_file.name, self._loop_device)
    725         utils.run('losetup -d %s' % self._loop_device)
    726         self._loop_device = None
    727 
    728     def format(self):
    729         """Formats the image file as the specified filesystem."""
    730         self.attach_to_loop_device()
    731         try:
    732             logging.debug('Formatting image file at "%s" as "%s" filesystem',
    733                           self._image_file.name, self._filesystem_type)
    734             utils.run('yes | mkfs -t %s %s %s' %
    735                       (self._filesystem_type, ' '.join(self._mkfs_options),
    736                        self._loop_device))
    737             logging.debug('blkid: %s', utils.system_output(
    738                 'blkid -c /dev/null %s' % self._loop_device,
    739                 ignore_status=True))
    740         except error.CmdError as exc:
    741             message = 'Failed to format filesystem image: %s' % exc
    742             raise RuntimeError(message)
    743 
    744     def mount(self, options=None):
    745         """Mounts the image file to a directory.
    746 
    747         Args:
    748             options: An optional list of mount options.
    749         """
    750         if self._mount_dir:
    751             return self._mount_dir.name
    752 
    753         if options is None:
    754             options = []
    755 
    756         options_arg = ','.join(options)
    757         if options_arg:
    758             options_arg = '-o ' + options_arg
    759 
    760         self.attach_to_loop_device()
    761         self._mount_dir = autotemp.tempdir(unique_id='fsImage')
    762         try:
    763             logging.debug('Mounting image file "%s" (%s) to directory "%s"',
    764                           self._image_file.name, self._loop_device,
    765                           self._mount_dir.name)
    766             utils.run('mount -t %s %s %s %s' %
    767                       (self._mount_filesystem_type, options_arg,
    768                        self._loop_device, self._mount_dir.name))
    769         except error.CmdError as exc:
    770             self._remove_mount_dir()
    771             message = ('Failed to mount virtual filesystem image "%s": %s' %
    772                        (self._image_file.name, exc))
    773             raise RuntimeError(message)
    774         return self._mount_dir.name
    775 
    776     def unmount(self):
    777         """Unmounts the image file from the mounted directory."""
    778         if not self._mount_dir:
    779             return
    780 
    781         try:
    782             logging.debug('Unmounting image file "%s" (%s) from directory "%s"',
    783                           self._image_file.name, self._loop_device,
    784                           self._mount_dir.name)
    785             utils.run('umount %s' % self._mount_dir.name)
    786         except error.CmdError as exc:
    787             message = ('Failed to unmount virtual filesystem image "%s": %s' %
    788                        (self._image_file.name, exc))
    789             raise RuntimeError(message)
    790         finally:
    791             self._remove_mount_dir()
    792 
    793     def get_volume_label(self):
    794         """Gets volume name information of |self._loop_device|
    795 
    796         @return a string with volume name if it exists.
    797         """
    798         # This script is run as root in a normal autotest run,
    799         # so this works: It doesn't have access to the necessary info
    800         # when run as a non-privileged user
    801         cmd = "blkid -c /dev/null -o udev %s" % self._loop_device
    802         output = utils.system_output(cmd, ignore_status=True)
    803 
    804         for line in output.splitlines():
    805             udev_key, udev_val = line.split('=')
    806 
    807             if udev_key == 'ID_FS_LABEL':
    808                 return udev_val
    809 
    810         return None
    811