Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import dbus, gobject, logging, os, stat
      6 from dbus.mainloop.glib import DBusGMainLoop
      7 
      8 import common
      9 from autotest_lib.client.bin import utils
     10 from autotest_lib.client.common_lib import autotemp, error
     11 from autotest_lib.client.cros import dbus_util
     12 from mainloop import ExceptionForward
     13 from mainloop import GenericTesterMainLoop
     14 
     15 
     16 """This module contains several helper classes for writing tests to verify the
     17 CrosDisks DBus interface. In particular, the CrosDisksTester class can be used
     18 to derive functional tests that interact with the CrosDisks server over DBus.
     19 """
     20 
     21 
     22 class ExceptionSuppressor(object):
     23     """A context manager class for suppressing certain types of exception.
     24 
     25     An instance of this class is expected to be used with the with statement
     26     and takes a set of exception classes at instantiation, which are types of
     27     exception to be suppressed (and logged) in the code block under the with
     28     statement.
     29 
     30     Example:
     31 
     32         with ExceptionSuppressor(OSError, IOError):
     33             # An exception, which is a sub-class of OSError or IOError, is
     34             # suppressed in the block code under the with statement.
     35     """
     36     def __init__(self, *args):
     37         self.__suppressed_exc_types = (args)
     38 
     39     def __enter__(self):
     40         return self
     41 
     42     def __exit__(self, exc_type, exc_value, traceback):
     43         if exc_type and issubclass(exc_type, self.__suppressed_exc_types):
     44             try:
     45                 logging.exception('Suppressed exception: %s(%s)',
     46                                   exc_type, exc_value)
     47             except Exception:
     48                 pass
     49             return True
     50         return False
     51 
     52 
     53 class DBusClient(object):
     54     """ A base class of a DBus proxy client to test a DBus server.
     55 
     56     This class is expected to be used along with a GLib main loop and provides
     57     some convenient functions for testing the DBus API exposed by a DBus server.
     58     """
     59 
     60     def __init__(self, main_loop, bus, bus_name, object_path, timeout=None):
     61         """Initializes the instance.
     62 
     63         Args:
     64             main_loop: The GLib main loop.
     65             bus: The bus where the DBus server is connected to.
     66             bus_name: The bus name owned by the DBus server.
     67             object_path: The object path of the DBus server.
     68             timeout: Maximum time in seconds to wait for the DBus connection.
     69         """
     70         self.__signal_content = {}
     71         self.main_loop = main_loop
     72         self.signal_timeout_in_seconds = 10
     73         logging.debug('Getting D-Bus proxy object on bus "%s" and path "%s"',
     74                       bus_name, object_path)
     75         self.proxy_object = dbus_util.get_dbus_object(bus, bus_name,
     76                                                       object_path, timeout)
     77 
     78     def clear_signal_content(self, signal_name):
     79         """Clears the content of the signal.
     80 
     81         Args:
     82             signal_name: The name of the signal.
     83         """
     84         if signal_name in self.__signal_content:
     85             self.__signal_content[signal_name] = None
     86 
     87     def get_signal_content(self, signal_name):
     88         """Gets the content of a signal.
     89 
     90         Args:
     91             signal_name: The name of the signal.
     92 
     93         Returns:
     94             The content of a signal or None if the signal is not being handled.
     95         """
     96         return self.__signal_content.get(signal_name)
     97 
     98     def handle_signal(self, interface, signal_name, argument_names=()):
     99         """Registers a signal handler to handle a given signal.
    100 
    101         Args:
    102             interface: The DBus interface of the signal.
    103             signal_name: The name of the signal.
    104             argument_names: A list of argument names that the signal contains.
    105         """
    106         if signal_name in self.__signal_content:
    107             return
    108 
    109         self.__signal_content[signal_name] = None
    110 
    111         def signal_handler(*args):
    112             self.__signal_content[signal_name] = dict(zip(argument_names, args))
    113 
    114         logging.debug('Handling D-Bus signal "%s(%s)" on interface "%s"',
    115                       signal_name, ', '.join(argument_names), interface)
    116         self.proxy_object.connect_to_signal(signal_name, signal_handler,
    117                                             interface)
    118 
    119     def wait_for_signal(self, signal_name):
    120         """Waits for the reception of a signal.
    121 
    122         Args:
    123             signal_name: The name of the signal to wait for.
    124 
    125         Returns:
    126             The content of the signal.
    127         """
    128         if signal_name not in self.__signal_content:
    129             return None
    130 
    131         def check_signal_content():
    132             context = self.main_loop.get_context()
    133             while context.iteration(False):
    134                 pass
    135             return self.__signal_content[signal_name] is not None
    136 
    137         logging.debug('Waiting for D-Bus signal "%s"', signal_name)
    138         utils.poll_for_condition(condition=check_signal_content,
    139                                  desc='%s signal' % signal_name,
    140                                  timeout=self.signal_timeout_in_seconds)
    141         content = self.__signal_content[signal_name]
    142         logging.debug('Received D-Bus signal "%s(%s)"', signal_name, content)
    143         self.__signal_content[signal_name] = None
    144         return content
    145 
    146     def expect_signal(self, signal_name, expected_content):
    147         """Waits the the reception of a signal and verifies its content.
    148 
    149         Args:
    150             signal_name: The name of the signal to wait for.
    151             expected_content: The expected content of the signal, which can be
    152                               partially specified. Only specified fields are
    153                               compared between the actual and expected content.
    154 
    155         Returns:
    156             The actual content of the signal.
    157 
    158         Raises:
    159             error.TestFail: A test failure when there is a mismatch between the
    160                             actual and expected content of the signal.
    161         """
    162         actual_content = self.wait_for_signal(signal_name)
    163         logging.debug("%s signal: expected=%s actual=%s",
    164                       signal_name, expected_content, actual_content)
    165         for argument, expected_value in expected_content.iteritems():
    166             if argument not in actual_content:
    167                 raise error.TestFail(
    168                     ('%s signal missing "%s": expected=%s, actual=%s') %
    169                     (signal_name, argument, expected_content, actual_content))
    170 
    171             if actual_content[argument] != expected_value:
    172                 raise error.TestFail(
    173                     ('%s signal not matched on "%s": expected=%s, actual=%s') %
    174                     (signal_name, argument, expected_content, actual_content))
    175         return actual_content
    176 
    177 
    178 class CrosDisksClient(DBusClient):
    179     """A DBus proxy client for testing the CrosDisks DBus server.
    180     """
    181 
    182     CROS_DISKS_BUS_NAME = 'org.chromium.CrosDisks'
    183     CROS_DISKS_INTERFACE = 'org.chromium.CrosDisks'
    184     CROS_DISKS_OBJECT_PATH = '/org/chromium/CrosDisks'
    185     DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties'
    186     FORMAT_COMPLETED_SIGNAL = 'FormatCompleted'
    187     FORMAT_COMPLETED_SIGNAL_ARGUMENTS = (
    188         'status', 'path'
    189     )
    190     MOUNT_COMPLETED_SIGNAL = 'MountCompleted'
    191     MOUNT_COMPLETED_SIGNAL_ARGUMENTS = (
    192         'status', 'source_path', 'source_type', 'mount_path'
    193     )
    194     RENAME_COMPLETED_SIGNAL = 'RenameCompleted'
    195     RENAME_COMPLETED_SIGNAL_ARGUMENTS = (
    196         'status', 'path'
    197     )
    198 
    199     def __init__(self, main_loop, bus, timeout_seconds=None):
    200         """Initializes the instance.
    201 
    202         Args:
    203             main_loop: The GLib main loop.
    204             bus: The bus where the DBus server is connected to.
    205             timeout_seconds: Maximum time in seconds to wait for the DBus
    206                              connection.
    207         """
    208         super(CrosDisksClient, self).__init__(main_loop, bus,
    209                                               self.CROS_DISKS_BUS_NAME,
    210                                               self.CROS_DISKS_OBJECT_PATH,
    211                                               timeout_seconds)
    212         self.interface = dbus.Interface(self.proxy_object,
    213                                         self.CROS_DISKS_INTERFACE)
    214         self.properties = dbus.Interface(self.proxy_object,
    215                                          self.DBUS_PROPERTIES_INTERFACE)
    216         self.handle_signal(self.CROS_DISKS_INTERFACE,
    217                            self.FORMAT_COMPLETED_SIGNAL,
    218                            self.FORMAT_COMPLETED_SIGNAL_ARGUMENTS)
    219         self.handle_signal(self.CROS_DISKS_INTERFACE,
    220                            self.MOUNT_COMPLETED_SIGNAL,
    221                            self.MOUNT_COMPLETED_SIGNAL_ARGUMENTS)
    222         self.handle_signal(self.CROS_DISKS_INTERFACE,
    223                            self.RENAME_COMPLETED_SIGNAL,
    224                            self.RENAME_COMPLETED_SIGNAL_ARGUMENTS)
    225 
    226     def enumerate_devices(self):
    227         """Invokes the CrosDisks EnumerateMountableDevices method.
    228 
    229         Returns:
    230             A list of sysfs paths of devices that are recognized by
    231             CrosDisks.
    232         """
    233         return self.interface.EnumerateDevices()
    234 
    235     def get_device_properties(self, path):
    236         """Invokes the CrosDisks GetDeviceProperties method.
    237 
    238         Args:
    239             path: The device path.
    240 
    241         Returns:
    242             The properties of the device in a dictionary.
    243         """
    244         return self.interface.GetDeviceProperties(path)
    245 
    246     def format(self, path, filesystem_type=None, options=None):
    247         """Invokes the CrosDisks Format method.
    248 
    249         Args:
    250             path: The device path to format.
    251             filesystem_type: The filesystem type used for formatting the device.
    252             options: A list of options used for formatting the device.
    253         """
    254         if filesystem_type is None:
    255             filesystem_type = ''
    256         if options is None:
    257             options = []
    258         self.clear_signal_content(self.FORMAT_COMPLETED_SIGNAL)
    259         self.interface.Format(path, filesystem_type,
    260                               dbus.Array(options, signature='s'))
    261 
    262     def wait_for_format_completion(self):
    263         """Waits for the CrosDisks FormatCompleted signal.
    264 
    265         Returns:
    266             The content of the FormatCompleted signal.
    267         """
    268         return self.wait_for_signal(self.FORMAT_COMPLETED_SIGNAL)
    269 
    270     def expect_format_completion(self, expected_content):
    271         """Waits and verifies for the CrosDisks FormatCompleted signal.
    272 
    273         Args:
    274             expected_content: The expected content of the FormatCompleted
    275                               signal, which can be partially specified.
    276                               Only specified fields are compared between the
    277                               actual and expected content.
    278 
    279         Returns:
    280             The actual content of the FormatCompleted signal.
    281 
    282         Raises:
    283             error.TestFail: A test failure when there is a mismatch between the
    284                             actual and expected content of the FormatCompleted
    285                             signal.
    286         """
    287         return self.expect_signal(self.FORMAT_COMPLETED_SIGNAL,
    288                                   expected_content)
    289 
    290     def rename(self, path, volume_name=None):
    291         """Invokes the CrosDisks Rename method.
    292 
    293         Args:
    294             path: The device path to rename.
    295             volume_name: The new name used for renaming.
    296         """
    297         if volume_name is None:
    298             volume_name = ''
    299         self.clear_signal_content(self.RENAME_COMPLETED_SIGNAL)
    300         self.interface.Rename(path, volume_name)
    301 
    302     def wait_for_rename_completion(self):
    303         """Waits for the CrosDisks RenameCompleted signal.
    304 
    305         Returns:
    306             The content of the RenameCompleted signal.
    307         """
    308         return self.wait_for_signal(self.RENAME_COMPLETED_SIGNAL)
    309 
    310     def expect_rename_completion(self, expected_content):
    311         """Waits and verifies for the CrosDisks RenameCompleted signal.
    312 
    313         Args:
    314             expected_content: The expected content of the RenameCompleted
    315                               signal, which can be partially specified.
    316                               Only specified fields are compared between the
    317                               actual and expected content.
    318 
    319         Returns:
    320             The actual content of the RenameCompleted signal.
    321 
    322         Raises:
    323             error.TestFail: A test failure when there is a mismatch between the
    324                             actual and expected content of the RenameCompleted
    325                             signal.
    326         """
    327         return self.expect_signal(self.RENAME_COMPLETED_SIGNAL,
    328                                   expected_content)
    329 
    330     def mount(self, path, filesystem_type=None, options=None):
    331         """Invokes the CrosDisks Mount method.
    332 
    333         Args:
    334             path: The device path to mount.
    335             filesystem_type: The filesystem type used for mounting the device.
    336             options: A list of options used for mounting the device.
    337         """
    338         if filesystem_type is None:
    339             filesystem_type = ''
    340         if options is None:
    341             options = []
    342         self.clear_signal_content(self.MOUNT_COMPLETED_SIGNAL)
    343         self.interface.Mount(path, filesystem_type,
    344                              dbus.Array(options, signature='s'))
    345 
    346     def unmount(self, path, options=None):
    347         """Invokes the CrosDisks Unmount method.
    348 
    349         Args:
    350             path: The device or mount path to unmount.
    351             options: A list of options used for unmounting the path.
    352 
    353         Returns:
    354             The mount error code.
    355         """
    356         if options is None:
    357             options = []
    358         return self.interface.Unmount(path, dbus.Array(options, signature='s'))
    359 
    360     def wait_for_mount_completion(self):
    361         """Waits for the CrosDisks MountCompleted signal.
    362 
    363         Returns:
    364             The content of the MountCompleted signal.
    365         """
    366         return self.wait_for_signal(self.MOUNT_COMPLETED_SIGNAL)
    367 
    368     def expect_mount_completion(self, expected_content):
    369         """Waits and verifies for the CrosDisks MountCompleted signal.
    370 
    371         Args:
    372             expected_content: The expected content of the MountCompleted
    373                               signal, which can be partially specified.
    374                               Only specified fields are compared between the
    375                               actual and expected content.
    376 
    377         Returns:
    378             The actual content of the MountCompleted signal.
    379 
    380         Raises:
    381             error.TestFail: A test failure when there is a mismatch between the
    382                             actual and expected content of the MountCompleted
    383                             signal.
    384         """
    385         return self.expect_signal(self.MOUNT_COMPLETED_SIGNAL,
    386                                   expected_content)
    387 
    388 
    389 class CrosDisksTester(GenericTesterMainLoop):
    390     """A base tester class for testing the CrosDisks server.
    391 
    392     A derived class should override the get_tests method to return a list of
    393     test methods. The perform_one_test method invokes each test method in the
    394     list to verify some functionalities of CrosDisks server.
    395     """
    396     def __init__(self, test):
    397         bus_loop = DBusGMainLoop(set_as_default=True)
    398         self.bus = dbus.SystemBus(mainloop=bus_loop)
    399         self.main_loop = gobject.MainLoop()
    400         super(CrosDisksTester, self).__init__(test, self.main_loop)
    401         self.cros_disks = CrosDisksClient(self.main_loop, self.bus)
    402 
    403     def get_tests(self):
    404         """Returns a list of test methods to be invoked by perform_one_test.
    405 
    406         A derived class should override this method.
    407 
    408         Returns:
    409             A list of test methods.
    410         """
    411         return []
    412 
    413     @ExceptionForward
    414     def perform_one_test(self):
    415         """Exercises each test method in the list returned by get_tests.
    416         """
    417         tests = self.get_tests()
    418         self.remaining_requirements = set([test.func_name for test in tests])
    419         for test in tests:
    420             test()
    421             self.requirement_completed(test.func_name)
    422 
    423     def reconnect_client(self, timeout_seconds=None):
    424       """"Reconnect the CrosDisks DBus client.
    425 
    426       Args:
    427           timeout_seconds: Maximum time in seconds to wait for the DBus
    428                            connection.
    429       """
    430       self.cros_disks = CrosDisksClient(self.main_loop, self.bus,
    431                                         timeout_seconds)
    432 
    433 
    434 class FilesystemTestObject(object):
    435     """A base class to represent a filesystem test object.
    436 
    437     A filesystem test object can be a file, directory or symbolic link.
    438     A derived class should override the _create and _verify method to implement
    439     how the test object should be created and verified, respectively, on a
    440     filesystem.
    441     """
    442     def __init__(self, path, content, mode):
    443         """Initializes the instance.
    444 
    445         Args:
    446             path: The relative path of the test object.
    447             content: The content of the test object.
    448             mode: The file permissions given to the test object.
    449         """
    450         self._path = path
    451         self._content = content
    452         self._mode = mode
    453 
    454     def create(self, base_dir):
    455         """Creates the test object in a base directory.
    456 
    457         Args:
    458             base_dir: The base directory where the test object is created.
    459 
    460         Returns:
    461             True if the test object is created successfully or False otherwise.
    462         """
    463         if not self._create(base_dir):
    464             logging.debug('Failed to create filesystem test object at "%s"',
    465                           os.path.join(base_dir, self._path))
    466             return False
    467         return True
    468 
    469     def verify(self, base_dir):
    470         """Verifies the test object in a base directory.
    471 
    472         Args:
    473             base_dir: The base directory where the test object is expected to be
    474                       found.
    475 
    476         Returns:
    477             True if the test object is found in the base directory and matches
    478             the expected content, or False otherwise.
    479         """
    480         if not self._verify(base_dir):
    481             logging.debug('Failed to verify filesystem test object at "%s"',
    482                           os.path.join(base_dir, self._path))
    483             return False
    484         return True
    485 
    486     def _create(self, base_dir):
    487         return False
    488 
    489     def _verify(self, base_dir):
    490         return False
    491 
    492 
    493 class FilesystemTestDirectory(FilesystemTestObject):
    494     """A filesystem test object that represents a directory."""
    495 
    496     def __init__(self, path, content, mode=stat.S_IRWXU|stat.S_IRGRP| \
    497                  stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH):
    498         super(FilesystemTestDirectory, self).__init__(path, content, mode)
    499 
    500     def _create(self, base_dir):
    501         path = os.path.join(base_dir, self._path) if self._path else base_dir
    502 
    503         if self._path:
    504             with ExceptionSuppressor(OSError):
    505                 os.makedirs(path)
    506                 os.chmod(path, self._mode)
    507 
    508         if not os.path.isdir(path):
    509             return False
    510 
    511         for content in self._content:
    512             if not content.create(path):
    513                 return False
    514         return True
    515 
    516     def _verify(self, base_dir):
    517         path = os.path.join(base_dir, self._path) if self._path else base_dir
    518         if not os.path.isdir(path):
    519             return False
    520 
    521         for content in self._content:
    522             if not content.verify(path):
    523                 return False
    524         return True
    525 
    526 
    527 class FilesystemTestFile(FilesystemTestObject):
    528     """A filesystem test object that represents a file."""
    529 
    530     def __init__(self, path, content, mode=stat.S_IRUSR|stat.S_IWUSR| \
    531                  stat.S_IRGRP|stat.S_IROTH):
    532         super(FilesystemTestFile, self).__init__(path, content, mode)
    533 
    534     def _create(self, base_dir):
    535         path = os.path.join(base_dir, self._path)
    536         with ExceptionSuppressor(IOError):
    537             with open(path, 'wb+') as f:
    538                 f.write(self._content)
    539             with ExceptionSuppressor(OSError):
    540                 os.chmod(path, self._mode)
    541             return True
    542         return False
    543 
    544     def _verify(self, base_dir):
    545         path = os.path.join(base_dir, self._path)
    546         with ExceptionSuppressor(IOError):
    547             with open(path, 'rb') as f:
    548                 return f.read() == self._content
    549         return False
    550 
    551 
    552 class DefaultFilesystemTestContent(FilesystemTestDirectory):
    553     def __init__(self):
    554         super(DefaultFilesystemTestContent, self).__init__('', [
    555             FilesystemTestFile('file1', '0123456789'),
    556             FilesystemTestDirectory('dir1', [
    557                 FilesystemTestFile('file1', ''),
    558                 FilesystemTestFile('file2', 'abcdefg'),
    559                 FilesystemTestDirectory('dir2', [
    560                     FilesystemTestFile('file3', 'abcdefg'),
    561                     FilesystemTestFile('file4', 'a' * 65536),
    562                 ]),
    563             ]),
    564         ], stat.S_IRWXU|stat.S_IRGRP|stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH)
    565 
    566 
    567 class VirtualFilesystemImage(object):
    568     def __init__(self, block_size, block_count, filesystem_type,
    569                  *args, **kwargs):
    570         """Initializes the instance.
    571 
    572         Args:
    573             block_size: The number of bytes of each block in the image.
    574             block_count: The number of blocks in the image.
    575             filesystem_type: The filesystem type to be given to the mkfs
    576                              program for formatting the image.
    577 
    578         Keyword Args:
    579             mount_filesystem_type: The filesystem type to be given to the
    580                                    mount program for mounting the image.
    581             mkfs_options: A list of options to be given to the mkfs program.
    582         """
    583         self._block_size = block_size
    584         self._block_count = block_count
    585         self._filesystem_type = filesystem_type
    586         self._mount_filesystem_type = kwargs.get('mount_filesystem_type')
    587         if self._mount_filesystem_type is None:
    588             self._mount_filesystem_type = filesystem_type
    589         self._mkfs_options = kwargs.get('mkfs_options')
    590         if self._mkfs_options is None:
    591             self._mkfs_options = []
    592         self._image_file = None
    593         self._loop_device = None
    594         self._loop_device_stat = None
    595         self._mount_dir = None
    596 
    597     def __del__(self):
    598         with ExceptionSuppressor(Exception):
    599             self.clean()
    600 
    601     def __enter__(self):
    602         self.create()
    603         return self
    604 
    605     def __exit__(self, exc_type, exc_value, traceback):
    606         self.clean()
    607         return False
    608 
    609     def _remove_temp_path(self, temp_path):
    610         """Removes a temporary file or directory created using autotemp."""
    611         if temp_path:
    612             with ExceptionSuppressor(Exception):
    613                 path = temp_path.name
    614                 temp_path.clean()
    615                 logging.debug('Removed "%s"', path)
    616 
    617     def _remove_image_file(self):
    618         """Removes the image file if one has been created."""
    619         self._remove_temp_path(self._image_file)
    620         self._image_file = None
    621 
    622     def _remove_mount_dir(self):
    623         """Removes the mount directory if one has been created."""
    624         self._remove_temp_path(self._mount_dir)
    625         self._mount_dir = None
    626 
    627     @property
    628     def image_file(self):
    629         """Gets the path of the image file.
    630 
    631         Returns:
    632             The path of the image file or None if no image file has been
    633             created.
    634         """
    635         return self._image_file.name if self._image_file else None
    636 
    637     @property
    638     def loop_device(self):
    639         """Gets the loop device where the image file is attached to.
    640 
    641         Returns:
    642             The path of the loop device where the image file is attached to or
    643             None if no loop device is attaching the image file.
    644         """
    645         return self._loop_device
    646 
    647     @property
    648     def mount_dir(self):
    649         """Gets the directory where the image file is mounted to.
    650 
    651         Returns:
    652             The directory where the image file is mounted to or None if no
    653             mount directory has been created.
    654         """
    655         return self._mount_dir.name if self._mount_dir else None
    656 
    657     def create(self):
    658         """Creates a zero-filled image file with the specified size.
    659 
    660         The created image file is temporary and removed when clean()
    661         is called.
    662         """
    663         self.clean()
    664         self._image_file = autotemp.tempfile(unique_id='fsImage')
    665         try:
    666             logging.debug('Creating zero-filled image file at "%s"',
    667                           self._image_file.name)
    668             utils.run('dd if=/dev/zero of=%s bs=%s count=%s' %
    669                       (self._image_file.name, self._block_size,
    670                        self._block_count))
    671         except error.CmdError as exc:
    672             self._remove_image_file()
    673             message = 'Failed to create filesystem image: %s' % exc
    674             raise RuntimeError(message)
    675 
    676     def clean(self):
    677         """Removes the image file if one has been created.
    678 
    679         Before removal, the image file is detached from the loop device that
    680         it is attached to.
    681         """
    682         self.detach_from_loop_device()
    683         self._remove_image_file()
    684 
    685     def attach_to_loop_device(self):
    686         """Attaches the created image file to a loop device.
    687 
    688         Creates the image file, if one has not been created, by calling
    689         create().
    690 
    691         Returns:
    692             The path of the loop device where the image file is attached to.
    693         """
    694         if self._loop_device:
    695             return self._loop_device
    696 
    697         if not self._image_file:
    698             self.create()
    699 
    700         logging.debug('Attaching image file "%s" to loop device',
    701                       self._image_file.name)
    702         utils.run('losetup -f %s' % self._image_file.name)
    703         output = utils.system_output('losetup -j %s' % self._image_file.name)
    704         # output should look like: "/dev/loop0: [000d]:6329 (/tmp/test.img)"
    705         self._loop_device = output.split(':')[0]
    706         logging.debug('Attached image file "%s" to loop device "%s"',
    707                       self._image_file.name, self._loop_device)
    708 
    709         self._loop_device_stat = os.stat(self._loop_device)
    710         logging.debug('Loop device "%s" (uid=%d, gid=%d, permissions=%04o)',
    711                       self._loop_device,
    712                       self._loop_device_stat.st_uid,
    713                       self._loop_device_stat.st_gid,
    714                       stat.S_IMODE(self._loop_device_stat.st_mode))
    715         return self._loop_device
    716 
    717     def detach_from_loop_device(self):
    718         """Detaches the image file from the loop device."""
    719         if not self._loop_device:
    720             return
    721 
    722         self.unmount()
    723 
    724         logging.debug('Cleaning up remaining mount points of loop device "%s"',
    725                       self._loop_device)
    726         utils.run('umount -f %s' % self._loop_device, ignore_status=True)
    727 
    728         logging.debug('Restore ownership/permissions of loop device "%s"',
    729                       self._loop_device)
    730         os.chmod(self._loop_device,
    731                  stat.S_IMODE(self._loop_device_stat.st_mode))
    732         os.chown(self._loop_device,
    733                  self._loop_device_stat.st_uid, self._loop_device_stat.st_gid)
    734 
    735         logging.debug('Detaching image file "%s" from loop device "%s"',
    736                       self._image_file.name, self._loop_device)
    737         utils.run('losetup -d %s' % self._loop_device)
    738         self._loop_device = None
    739 
    740     def format(self):
    741         """Formats the image file as the specified filesystem."""
    742         self.attach_to_loop_device()
    743         try:
    744             logging.debug('Formatting image file at "%s" as "%s" filesystem',
    745                           self._image_file.name, self._filesystem_type)
    746             utils.run('yes | mkfs -t %s %s %s' %
    747                       (self._filesystem_type, ' '.join(self._mkfs_options),
    748                        self._loop_device))
    749             logging.debug('blkid: %s', utils.system_output(
    750                 'blkid -c /dev/null %s' % self._loop_device,
    751                 ignore_status=True))
    752         except error.CmdError as exc:
    753             message = 'Failed to format filesystem image: %s' % exc
    754             raise RuntimeError(message)
    755 
    756     def mount(self, options=None):
    757         """Mounts the image file to a directory.
    758 
    759         Args:
    760             options: An optional list of mount options.
    761         """
    762         if self._mount_dir:
    763             return self._mount_dir.name
    764 
    765         if options is None:
    766             options = []
    767 
    768         options_arg = ','.join(options)
    769         if options_arg:
    770             options_arg = '-o ' + options_arg
    771 
    772         self.attach_to_loop_device()
    773         self._mount_dir = autotemp.tempdir(unique_id='fsImage')
    774         try:
    775             logging.debug('Mounting image file "%s" (%s) to directory "%s"',
    776                           self._image_file.name, self._loop_device,
    777                           self._mount_dir.name)
    778             utils.run('mount -t %s %s %s %s' %
    779                       (self._mount_filesystem_type, options_arg,
    780                        self._loop_device, self._mount_dir.name))
    781         except error.CmdError as exc:
    782             self._remove_mount_dir()
    783             message = ('Failed to mount virtual filesystem image "%s": %s' %
    784                        (self._image_file.name, exc))
    785             raise RuntimeError(message)
    786         return self._mount_dir.name
    787 
    788     def unmount(self):
    789         """Unmounts the image file from the mounted directory."""
    790         if not self._mount_dir:
    791             return
    792 
    793         try:
    794             logging.debug('Unmounting image file "%s" (%s) from directory "%s"',
    795                           self._image_file.name, self._loop_device,
    796                           self._mount_dir.name)
    797             utils.run('umount %s' % self._mount_dir.name)
    798         except error.CmdError as exc:
    799             message = ('Failed to unmount virtual filesystem image "%s": %s' %
    800                        (self._image_file.name, exc))
    801             raise RuntimeError(message)
    802         finally:
    803             self._remove_mount_dir()
    804 
    805     def get_volume_label(self):
    806         """Gets volume name information of |self._loop_device|
    807 
    808         @return a string with volume name if it exists.
    809         """
    810         # This script is run as root in a normal autotest run,
    811         # so this works: It doesn't have access to the necessary info
    812         # when run as a non-privileged user
    813         cmd = "blkid -c /dev/null -o udev %s" % self._loop_device
    814         output = utils.system_output(cmd, ignore_status=True)
    815 
    816         for line in output.splitlines():
    817             udev_key, udev_val = line.split('=')
    818 
    819             if udev_key == 'ID_FS_LABEL':
    820                 return udev_val
    821 
    822         return None
    823