Home | History | Annotate | Download | only in security_SysVIPC
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 import re
      8 
      9 from collections import namedtuple
     10 
     11 from autotest_lib.client.bin import test, utils
     12 from autotest_lib.client.common_lib import error
     13 
     14 ShmRecord = namedtuple('ShmRecord', ['owner', 'perms', 'attached'])
     15 SemaphoreRecord = namedtuple('SemaphoreRecord', ['owner', 'perms'])
     16 
     17 class security_SysVIPC(test.test):
     18     """Detect emergence of new attack surfaces in SysV IPC."""
     19     version = 1
     20     expected_shm = set([ShmRecord(owner='cras', perms='640',
     21                                   attached=('/usr/bin/cras',))])
     22     expected_sem = set([SemaphoreRecord(owner='root', perms='600')])
     23 
     24     def dump_ipcs_to_results(self):
     25         """Writes a copy of the 'ipcs' output to the autotest results dir."""
     26         utils.system_output('ipcs > "%s/ipcs-output.txt"' % self.resultsdir)
     27 
     28 
     29     def find_attached(self, shmid):
     30         """Find programs attached to a given shared memory segment.
     31 
     32         Returns full paths to each program identified.
     33 
     34         Args:
     35           @param shmid: the id as shown in ipcs and related utilities.
     36         """
     37         # This finds /proc/*/exe entries where maps shows they have
     38         # attached to the specified shm segment.
     39         cmd = 'grep "%s */SYSV" /proc/*/maps | sed "s/maps.*/exe/g"' % shmid
     40         # Then we just need to readlink each of the links. Even though
     41         # we ultimately convert to a sorted tuple, we use a set to avoid
     42         # accumulating duplicates as we go along.
     43         exes = set()
     44         for link in utils.system_output(cmd).splitlines():
     45             exes.add(os.readlink(link))
     46         return tuple(sorted(exes))
     47 
     48 
     49     def observe_shm(self):
     50         """Return a set of ShmRecords representing current system shm usage."""
     51         seen = set()
     52         cmd = 'ipcs -m | grep ^0'
     53         for line in utils.system_output(cmd, ignore_status=True).splitlines():
     54             fields = re.split('\s+', line)
     55             shmid = fields[1]
     56             owner = fields[2]
     57             perms = fields[3]
     58             attached = self.find_attached(shmid)
     59             seen.add(ShmRecord(owner=owner, perms=perms, attached=attached))
     60         return seen
     61 
     62 
     63     def observe_sems(self):
     64         """Return a set of SemaphoreRecords representing current usage."""
     65         seen = set()
     66         cmd = 'ipcs -s | grep ^0'
     67         for line in utils.system_output(cmd, ignore_status=True).splitlines():
     68             fields = re.split('\s+', line)
     69             seen.add(SemaphoreRecord(owner=fields[2], perms=fields[3]))
     70         return seen
     71 
     72 
     73     def run_once(self):
     74         """Main entry point to run the security_SysVIPC autotest."""
     75         test_fail = False
     76         self.dump_ipcs_to_results()
     77         # Check Shared Memory.
     78         observed_shm = self.observe_shm()
     79         missing = self.expected_shm.difference(observed_shm)
     80         extra = observed_shm.difference(self.expected_shm)
     81         if missing:
     82             logging.error('Expected shm(s) not found:')
     83             logging.error(missing)
     84         if extra:
     85             test_fail = True
     86             logging.error('Unexpected shm(s) found:')
     87             logging.error(extra)
     88 
     89         # Check Semaphores.
     90         observed_sem = self.observe_sems()
     91         missing = self.expected_sem.difference(observed_sem)
     92         extra = observed_sem.difference(self.expected_sem)
     93         if missing:
     94             logging.error('Expected semaphore(s) not found:')
     95             logging.error(missing)
     96         if extra:
     97             test_fail = True
     98             logging.error('Unexpected semaphore(s) found:')
     99             logging.error(extra)
    100 
    101         # Also check Message Queues. Since we currently expect
    102         # none, we can avoid over-engineering this check.
    103         queues = utils.system_output('ipcs -q | grep ^0', ignore_status=True)
    104         if queues:
    105             test_fail = True
    106             logging.error('Unexpected message queues found:')
    107             logging.error(queues)
    108 
    109         if test_fail:
    110             raise error.TestFail('SysV IPCs did not match expectations')
    111