Home | History | Annotate | Download | only in platform_RootPartitionsNotMounted
      1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 
      8 from autotest_lib.client.bin import utils as bin_utils
      9 from autotest_lib.client.bin import test
     10 from autotest_lib.client.common_lib import error, utils
     11 
     12 class platform_RootPartitionsNotMounted(test.test):
     13     version = 1
     14 
     15     _CGPT_PATH = '/usr/bin/cgpt'
     16     _ROOTDEV_PATH = '/usr/bin/rootdev'
     17     _UPDATE_ENGINE_PATH = '/usr/sbin/update_engine'
     18 
     19     def get_root_partitions(self, device):
     20         """Gets a list of root partitions of a device.
     21 
     22         Gets a list of root partitions of a device by calling
     23         `cgpt find -t rootfs <device>`.
     24 
     25         Args:
     26             device: The device, specified by its device file, to examine.
     27 
     28         Returns:
     29             A list of root partitions, specified by their device file,
     30             (e.g. /dev/sda1) of the given device.
     31         """
     32         cgpt_command = '%s find -t rootfs %s' % (self._CGPT_PATH, device)
     33         return utils.run(cgpt_command).stdout.strip('\n').split('\n')
     34 
     35     def get_mounted_devices(self, mounts_file):
     36         """Gets a set of mounted devices from a given mounts file.
     37 
     38         Gets a set of device files that are currently mounted. This method
     39         parses a given mounts file (e.g. /proc/<pid>/mounts) and extracts the
     40         entries with a source path under /dev/.
     41 
     42         Returns:
     43             A set of device file names (e.g. /dev/sda1)
     44         """
     45         mounted_devices = set()
     46         try:
     47             entries = open(mounts_file).readlines()
     48         except:
     49             entries = []
     50         for entry in entries:
     51             node = entry.split(' ')[0]
     52             if node.startswith('/dev/'):
     53                 mounted_devices.add(node)
     54         return mounted_devices
     55 
     56     def get_process_executable(self, pid):
     57         """Gets the executable path of a given process ID.
     58 
     59         Args:
     60             pid: Target process ID.
     61 
     62         Returns:
     63             The executable path of the given process ID or None on error.
     64         """
     65         try:
     66             return os.readlink('/proc/%s/exe' % pid)
     67         except:
     68             return ""
     69 
     70     def get_process_list(self, excluded_executables=[]):
     71         """Gets a list of process IDs of active processes.
     72 
     73         Gets a list of process IDs of active processes by looking into /proc
     74         and filters out those processes with a executable path that is
     75         excluded.
     76 
     77         Args:
     78             excluded_executables: A list of executable paths to exclude.
     79 
     80         Returns:
     81             A list of process IDs of active processes.
     82         """
     83         processes = []
     84         for path in os.listdir('/proc'):
     85             if not path.isdigit(): continue
     86             process_exe = self.get_process_executable(path)
     87             if process_exe and process_exe not in excluded_executables:
     88                 processes.append(path)
     89         return processes
     90 
     91     def run_once(self):
     92         if os.geteuid() != 0:
     93             raise error.TestNAError('This test needs to be run under root')
     94 
     95         for path in [self._CGPT_PATH, self._ROOTDEV_PATH]:
     96             if not os.path.isfile(path):
     97                 raise error.TestNAError('%s not found' % path)
     98 
     99         root_device = bin_utils.get_root_device()
    100         if not root_device:
    101             raise error.TestNAError('Could not find the root device')
    102         logging.debug('Root device: %s' % root_device)
    103 
    104         root_partitions = self.get_root_partitions(root_device)
    105         if not root_partitions:
    106             raise error.TestNAError('Could not find any root partition')
    107         logging.debug('Root partitions: %s' % ', '.join(root_partitions))
    108 
    109         processes = self.get_process_list([self._UPDATE_ENGINE_PATH])
    110         if not processes:
    111             raise error.TestNAError('Could not find any process')
    112         logging.debug('Active processes: %s' % ', '.join(processes))
    113 
    114         for process in processes:
    115             process_exe = self.get_process_executable(process)
    116             mounts_file = '/proc/%s/mounts' % process
    117             mounted_devices = self.get_mounted_devices(mounts_file)
    118             for partition in root_partitions:
    119                 if partition in mounted_devices:
    120                     raise error.TestFail(
    121                             'Root partition "%s" is mounted by process %s (%s)'
    122                             % (partition, process, process_exe))
    123