Home | History | Annotate | Download | only in platform_RootPartitionsNotMounted
      1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 
      8 from autotest_lib.client.bin import site_utils, test
      9 from autotest_lib.client.common_lib import error, utils
     10 
     11 class platform_RootPartitionsNotMounted(test.test):
     12     version = 1
     13 
     14     _CGPT_PATH = '/usr/bin/cgpt'
     15     _ROOTDEV_PATH = '/usr/bin/rootdev'
     16     _UPDATE_ENGINE_PATH = '/usr/sbin/update_engine'
     17 
     18     def get_root_partitions(self, device):
     19         """Gets a list of root partitions of a device.
     20 
     21         Gets a list of root partitions of a device by calling
     22         `cgpt find -t rootfs <device>`.
     23 
     24         Args:
     25             device: The device, specified by its device file, to examine.
     26 
     27         Returns:
     28             A list of root partitions, specified by their device file,
     29             (e.g. /dev/sda1) of the given device.
     30         """
     31         cgpt_command = '%s find -t rootfs %s' % (self._CGPT_PATH, device)
     32         return utils.run(cgpt_command).stdout.strip('\n').split('\n')
     33 
     34     def get_mounted_devices(self, mounts_file):
     35         """Gets a set of mounted devices from a given mounts file.
     36 
     37         Gets a set of device files that are currently mounted. This method
     38         parses a given mounts file (e.g. /proc/<pid>/mounts) and extracts the
     39         entries with a source path under /dev/.
     40 
     41         Returns:
     42             A set of device file names (e.g. /dev/sda1)
     43         """
     44         mounted_devices = set()
     45         try:
     46             entries = open(mounts_file).readlines()
     47         except:
     48             entries = []
     49         for entry in entries:
     50             node = entry.split(' ')[0]
     51             if node.startswith('/dev/'):
     52                 mounted_devices.add(node)
     53         return mounted_devices
     54 
     55     def get_process_executable(self, pid):
     56         """Gets the executable path of a given process ID.
     57 
     58         Args:
     59             pid: Target process ID.
     60 
     61         Returns:
     62             The executable path of the given process ID or None on error.
     63         """
     64         try:
     65             return os.readlink('/proc/%s/exe' % pid)
     66         except:
     67             return ""
     68 
     69     def get_process_list(self, excluded_executables=[]):
     70         """Gets a list of process IDs of active processes.
     71 
     72         Gets a list of process IDs of active processes by looking into /proc
     73         and filters out those processes with a executable path that is
     74         excluded.
     75 
     76         Args:
     77             excluded_executables: A list of executable paths to exclude.
     78 
     79         Returns:
     80             A list of process IDs of active processes.
     81         """
     82         processes = []
     83         for path in os.listdir('/proc'):
     84             if not path.isdigit(): continue
     85             process_exe = self.get_process_executable(path)
     86             if process_exe and process_exe not in excluded_executables:
     87                 processes.append(path)
     88         return processes
     89 
     90     def run_once(self):
     91         if os.geteuid() != 0:
     92             raise error.TestNAError('This test needs to be run under root')
     93 
     94         for path in [self._CGPT_PATH, self._ROOTDEV_PATH]:
     95             if not os.path.isfile(path):
     96                 raise error.TestNAError('%s not found' % path)
     97 
     98         root_device = site_utils.get_root_device()
     99         if not root_device:
    100             raise error.TestNAError('Could not find the root device')
    101         logging.debug('Root device: %s' % root_device)
    102 
    103         root_partitions = self.get_root_partitions(root_device)
    104         if not root_partitions:
    105             raise error.TestNAError('Could not find any root partition')
    106         logging.debug('Root partitions: %s' % ', '.join(root_partitions))
    107 
    108         processes = self.get_process_list([self._UPDATE_ENGINE_PATH])
    109         if not processes:
    110             raise error.TestNAError('Could not find any process')
    111         logging.debug('Active processes: %s' % ', '.join(processes))
    112 
    113         for process in processes:
    114             process_exe = self.get_process_executable(process)
    115             mounts_file = '/proc/%s/mounts' % process
    116             mounted_devices = self.get_mounted_devices(mounts_file)
    117             for partition in root_partitions:
    118                 if partition in mounted_devices:
    119                     raise error.TestFail(
    120                             'Root partition "%s" is mounted by process %s (%s)'
    121                             % (partition, process, process_exe))
    122