Home | History | Annotate | Download | only in utils
      1 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """A module to provide interface to OS services."""
      6 
      7 import datetime
      8 import os
      9 import re
     10 import struct
     11 
     12 import shell_wrapper
     13 
     14 
     15 class OSInterfaceError(Exception):
     16     """OS interface specific exception."""
     17     pass
     18 
     19 class Crossystem(object):
     20     """A wrapper for the crossystem utility."""
     21 
     22     # Code dedicated for user triggering recovery mode through crossystem.
     23     USER_RECOVERY_REQUEST_CODE = '193'
     24 
     25     def init(self, os_if):
     26         """Init the instance. If running on Mario - adjust the map."""
     27         self.os_if = os_if
     28 
     29     def __getattr__(self, name):
     30         """
     31         Retrieve a crosssystem attribute.
     32 
     33         Attempt to access crossystemobject.name will invoke `crossystem name'
     34         and return the stdout as the value.
     35         """
     36         return self.os_if.run_shell_command_get_output(
     37             'crossystem %s' % name)[0]
     38 
     39     def __setattr__(self, name, value):
     40         if name in ('os_if',):
     41             self.__dict__[name] = value
     42         else:
     43             self.os_if.run_shell_command('crossystem "%s=%s"' % (name, value))
     44 
     45     def request_recovery(self):
     46         """Request recovery mode next time the target reboots."""
     47 
     48         self.__setattr__('recovery_request', self.USER_RECOVERY_REQUEST_CODE)
     49 
     50 
     51 class OSInterface(object):
     52     """An object to encapsulate OS services functions."""
     53 
     54     ANDROID_TESTER_FILE = '/mnt/stateful_partition/.android_faft_tester'
     55 
     56     def __init__(self):
     57         """Object construction time initialization."""
     58         self.state_dir = None
     59         self.log_file = None
     60         self.cs = Crossystem()
     61         self.is_android = os.path.isfile(self.ANDROID_TESTER_FILE)
     62         if self.is_android:
     63             self.shell = shell_wrapper.AdbShell()
     64             self.host_shell = shell_wrapper.LocalShell()
     65         else:
     66             self.shell = shell_wrapper.LocalShell()
     67             self.host_shell = None
     68 
     69 
     70     def init(self, state_dir=None, log_file=None):
     71         """Initialize the OS interface object.
     72 
     73         Args:
     74           state_dir - a string, the name of the directory (as defined by the
     75                       caller). The contents of this directory persist over
     76                       system restarts and power cycles.
     77           log_file - a string, the name of the log file kept in the state
     78                      directory.
     79 
     80         Default argument values support unit testing.
     81         """
     82         self.cs.init(self)
     83         self.state_dir = state_dir
     84 
     85         if self.state_dir:
     86             if not os.path.exists(self.state_dir):
     87                 try:
     88                     os.mkdir(self.state_dir)
     89                 except OSError, err:
     90                     raise OSInterfaceError(err)
     91             if log_file:
     92                 if log_file[0] == '/':
     93                     self.log_file = log_file
     94                 else:
     95                     self.log_file = os.path.join(state_dir, log_file)
     96 
     97         # Initialize the shell. Should be after creating the log file.
     98         self.shell.init(self)
     99         if self.host_shell:
    100             self.host_shell.init(self)
    101 
    102     def has_host(self):
    103         """Return True if a host is connected to DUT."""
    104         return self.is_android
    105 
    106     def run_shell_command(self, cmd):
    107         """Run a shell command."""
    108         self.shell.run_command(cmd)
    109 
    110     def run_shell_command_get_status(self, cmd):
    111         """Run shell command and return its return code."""
    112         return self.shell.run_command_get_status(cmd)
    113 
    114     def run_shell_command_get_output(self, cmd):
    115         """Run shell command and return its console output."""
    116         return self.shell.run_command_get_output(cmd)
    117 
    118     def run_host_shell_command(self, cmd, block=True):
    119         """Run a shell command on the host."""
    120         if self.host_shell:
    121             self.host_shell.run_command(cmd, block)
    122         else:
    123             raise OSInterfaceError('There is no host for DUT.')
    124 
    125     def run_host_shell_command_get_status(self, cmd):
    126         """Run shell command and return its return code on the host."""
    127         if self.host_shell:
    128             return self.host_shell.run_command_get_status(cmd)
    129         else:
    130             raise OSInterfaceError('There is no host for DUT.')
    131 
    132     def run_host_shell_command_get_output(self, cmd):
    133         """Run shell command and return its console output."""
    134         if self.host_shell:
    135             return self.host_shell.run_command_get_output(cmd)
    136         else:
    137             raise OSInterfaceError('There is no host for DUT.')
    138 
    139     def read_file(self, path):
    140         """Read the content of the file."""
    141         return self.shell.read_file(path)
    142 
    143     def write_file(self, path, data):
    144         """Write the data to the file."""
    145         self.shell.write_file(path, data)
    146 
    147     def append_file(self, path, data):
    148         """Append the data to the file."""
    149         self.shell.append_file(path, data)
    150 
    151     def path_exists(self, path):
    152         """Return True if the path exists on DUT."""
    153         cmd = 'test -e %s' % path
    154         return self.run_shell_command_get_status(cmd) == 0
    155 
    156     def is_dir(self, path):
    157         """Return True if the path is a directory."""
    158         cmd = 'test -d %s' % path
    159         return self.run_shell_command_get_status(cmd) == 0
    160 
    161     def create_dir(self, path):
    162         """Create a new directory."""
    163         cmd = 'mkdir -p %s' % path
    164         return self.run_shell_command(cmd)
    165 
    166     def create_temp_file(self, prefix):
    167         """Create a temporary file with a prefix."""
    168         if self.is_android:
    169             tmp_path = '/data/local/tmp'
    170         else:
    171             tmp_path = '/tmp'
    172         cmd = 'mktemp -p %s %sXXXXXX' % (tmp_path, prefix)
    173         return self.run_shell_command_get_output(cmd)[0]
    174 
    175     def copy_file(self, from_path, to_path):
    176         """Copy the file."""
    177         cmd = 'cp -f %s %s' % (from_path, to_path)
    178         return self.run_shell_command(cmd)
    179 
    180     def copy_dir(self, from_path, to_path):
    181         """Copy the directory."""
    182         cmd = 'cp -rf %s %s' % (from_path, to_path)
    183         return self.run_shell_command(cmd)
    184 
    185     def remove_file(self, path):
    186         """Remove the file."""
    187         cmd = 'rm -f %s' % path
    188         return self.run_shell_command(cmd)
    189 
    190     def remove_dir(self, path):
    191         """Remove the directory."""
    192         cmd = 'rm -rf %s' % path
    193         return self.run_shell_command(cmd)
    194 
    195     def get_file_size(self, path):
    196         """Get the size of the file."""
    197         cmd = 'stat -c %%s %s' % path
    198         return int(self.run_shell_command_get_output(cmd)[0])
    199 
    200     def target_hosted(self):
    201         """Return True if running on DUT."""
    202         if self.is_android:
    203             return True
    204         signature = open('/etc/lsb-release', 'r').readlines()[0]
    205         return re.search(r'chrom(ium|e)os', signature, re.IGNORECASE) != None
    206 
    207     def state_dir_file(self, file_name):
    208         """Get a full path of a file in the state directory."""
    209         return os.path.join(self.state_dir, file_name)
    210 
    211     def wait_for_device(self, timeout):
    212         """Wait for an Android device to be connected."""
    213         return self.shell.wait_for_device(timeout)
    214 
    215     def wait_for_no_device(self, timeout):
    216         """Wait for no Android device to be connected (offline)."""
    217         return self.shell.wait_for_no_device(timeout)
    218 
    219     def log(self, text):
    220         """Write text to the log file and print it on the screen, if enabled.
    221 
    222         The entire log (maintained across reboots) can be found in
    223         self.log_file.
    224         """
    225         if not self.log_file or not os.path.exists(self.state_dir):
    226             # Called before environment was initialized, ignore.
    227             return
    228 
    229         timestamp = datetime.datetime.strftime(
    230             datetime.datetime.now(), '%I:%M:%S %p:')
    231 
    232         with open(self.log_file, 'a') as log_f:
    233             log_f.write('%s %s\n' % (timestamp, text))
    234             log_f.flush()
    235             os.fdatasync(log_f)
    236 
    237     def is_removable_device(self, device):
    238         """Check if a certain storage device is removable.
    239 
    240         device - a string, file name of a storage device or a device partition
    241                  (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]).
    242 
    243         Returns True if the device is removable, False if not.
    244         """
    245         if self.is_android:
    246             return False
    247 
    248         if not self.target_hosted():
    249             return False
    250 
    251         # Drop trailing digit(s) and letter(s) (if any)
    252         base_dev = self.strip_part(device.split('/')[2])
    253         removable = int(self.read_file('/sys/block/%s/removable' % base_dev))
    254 
    255         return removable == 1
    256 
    257     def get_internal_disk(self, device):
    258         """Get the internal disk by given the current disk.
    259 
    260         If device is removable device, internal disk is decided by which kind
    261         of divice (arm or x86). Otherwise, return device itself.
    262 
    263         device - a string, file name of a storage device or a device partition
    264                  (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]).
    265 
    266         Return internal kernel disk.
    267         """
    268         if self.is_removable_device(device):
    269             for p in ('/dev/mmcblk0', '/dev/mmcblk1', '/dev/nvme0n1'):
    270                 if self.path_exists(p):
    271                     return p
    272             return '/dev/sda'
    273         else:
    274             return self.strip_part(device)
    275 
    276     def get_root_part(self):
    277         """Return a string, the name of root device with partition number"""
    278         # FIXME(waihong): Android doesn't support dual kernel/root and misses
    279         # the related tools. Just return something that not break the existing
    280         # code.
    281         if self.is_android:
    282             return '/dev/mmcblk0p3'
    283         else:
    284             return self.run_shell_command_get_output('rootdev -s')[0]
    285 
    286     def get_root_dev(self):
    287         """Return a string, the name of root device without partition number"""
    288         return self.strip_part(self.get_root_part())
    289 
    290     def join_part(self, dev, part):
    291         """Return a concatenated string of device and partition number"""
    292         if dev.endswith(tuple(str(i) for i in range(0, 10))):
    293             return dev + 'p' + part
    294         else:
    295             return dev + part
    296 
    297     def strip_part(self, dev_with_part):
    298         """Return a stripped string without partition number"""
    299         dev_name_stripper = re.compile('p?[0-9]+$')
    300         return dev_name_stripper.sub('', dev_with_part)
    301 
    302     def retrieve_body_version(self, blob):
    303         """Given a blob, retrieve body version.
    304 
    305         Currently works for both, firmware and kernel blobs. Returns '-1' in
    306         case the version can not be retrieved reliably.
    307         """
    308         header_format = '<8s8sQ'
    309         preamble_format = '<40sQ'
    310         magic, _, kb_size = struct.unpack_from(header_format, blob)
    311 
    312         if magic != 'CHROMEOS':
    313             return -1  # This could be a corrupted version case.
    314 
    315         _, version = struct.unpack_from(preamble_format, blob, kb_size)
    316         return version
    317 
    318     def retrieve_datakey_version(self, blob):
    319         """Given a blob, retrieve firmware data key version.
    320 
    321         Currently works for both, firmware and kernel blobs. Returns '-1' in
    322         case the version can not be retrieved reliably.
    323         """
    324         header_format = '<8s96sQ'
    325         magic, _, version = struct.unpack_from(header_format, blob)
    326         if magic != 'CHROMEOS':
    327             return -1 # This could be a corrupted version case.
    328         return version
    329 
    330     def retrieve_kernel_subkey_version(self, blob):
    331         """Given a blob, retrieve kernel subkey version.
    332 
    333         It is in firmware vblock's preamble.
    334         """
    335 
    336         header_format = '<8s8sQ'
    337         preamble_format = '<72sQ'
    338         magic, _, kb_size = struct.unpack_from(header_format, blob)
    339 
    340         if magic != 'CHROMEOS':
    341             return -1
    342 
    343         _, version = struct.unpack_from(preamble_format, blob, kb_size)
    344         return version
    345 
    346     def retrieve_preamble_flags(self, blob):
    347         """Given a blob, retrieve preamble flags if available.
    348 
    349         It only works for firmware. If the version of preamble header is less
    350         than 2.1, no preamble flags supported, just returns 0.
    351         """
    352         header_format = '<8s8sQ'
    353         preamble_format = '<32sII64sI'
    354         magic, _, kb_size = struct.unpack_from(header_format, blob)
    355 
    356         if magic != 'CHROMEOS':
    357             return -1  # This could be a corrupted version case.
    358 
    359         _, ver, subver, _, flags = struct.unpack_from(preamble_format, blob,
    360                                                       kb_size)
    361 
    362         if ver > 2 or (ver == 2 and subver >= 1):
    363             return flags
    364         else:
    365             return 0  # Returns 0 if preamble flags not available.
    366 
    367     def read_partition(self, partition, size):
    368         """Read the requested partition, up to size bytes."""
    369         tmp_file = self.state_dir_file('part.tmp')
    370         self.run_shell_command('dd if=%s of=%s bs=1 count=%d' % (
    371                 partition, tmp_file, size))
    372         data = self.read_file(tmp_file)
    373         self.remove_file(tmp_file)
    374         return data
    375