Home | History | Annotate | Download | only in network_ShillInitScripts
      1 import grp
      2 import mock_flimflam
      3 import os
      4 import pwd
      5 import stat
      6 import time
      7 import utils
      8 
      9 from autotest_lib.client.bin import test
     10 from autotest_lib.client.common_lib import error
     11 
     12 class network_ShillInitScripts(test.test):
     13     """ Test that shill init scripts perform as expected.  Use the
     14         real filesystem (doing a best effort to archive and restore
     15         current state).  The shill manager is stopped and a proxy
     16         DBus entity is installed to accept DBus messages that are sent
     17         via "dbus-send" in the shill startup scripts.  However, the
     18         "real" shill is still also started from time to time and we
     19         check that it is run with the right command line arguments.
     20     """
     21     version = 1
     22     save_directories = [ '/var/cache/shill',
     23                          '/run/shill',
     24                          '/run/state/logged-in',
     25                          '/run/dhcpcd',
     26                          '/var/lib/dhcpcd',
     27                          ]
     28     fake_user = 'not-a-real-user (at] chromium.org'
     29     saved_config = '/tmp/network_ShillInitScripts_saved_config.tgz'
     30     cryptohome_path_command = 'cryptohome-path'
     31     guest_shill_user_profile_dir = '/run/shill/guest_user_profile/shill'
     32     guest_shill_user_log_dir = '/run/shill/guest_user_profile/shill_logs'
     33     magic_header = '# --- shill init file test magic header ---'
     34 
     35 
     36     def start_shill(self):
     37         """ Starts a shill instance. """
     38         utils.start_service('shill')
     39 
     40 
     41     def stop_shill(self):
     42         """ Halt the running shill instance. """
     43         utils.stop_service('shill', ignore_status=True)
     44 
     45         for attempt in range(10):
     46             if not self.find_pid('shill'):
     47                 break
     48             time.sleep(1)
     49         else:
     50             raise error.TestFail('Shill process does not appear to be dying')
     51 
     52 
     53     def login(self, user=None):
     54         """ Simulate the login process.
     55 
     56         Note: "start" blocks until the "script" block completes.
     57 
     58         @param user string user name (email address) to log in.
     59 
     60         """
     61 
     62         if utils.has_systemd():
     63             start_cmd = (('systemctl set-environment CHROMEOS_USER=%s'
     64                           ' && systemctl start shill-start-user-session') %
     65                          (user or self.fake_user))
     66         else:
     67             start_cmd = ('start shill-start-user-session CHROMEOS_USER=%s' %
     68                          (user or self.fake_user))
     69         utils.system(start_cmd)
     70 
     71 
     72     def login_guest(self):
     73         """ Simulate guest login.
     74 
     75         For guest login, session-manager passes an empty CHROMEOS_USER arg.
     76 
     77         """
     78         self.login('""')
     79 
     80 
     81     def logout(self):
     82         """ Simulate user logout.
     83 
     84         Note: "start" blocks until the "script" block completes.
     85 
     86         """
     87         utils.start_service('shill-stop-user-session')
     88 
     89 
     90     def start_test(self):
     91         """ Setup the start of the test.  Stop shill and create test harness."""
     92         # Stop a system process on test duts for keeping connectivity up.
     93         ret = utils.stop_service('recover_duts', ignore_status=True)
     94         self.recover_duts_stopped = (ret == 0);
     95 
     96         self.stop_shill()
     97 
     98         # Deduce the root cryptohome directory name for our fake user.
     99         self.root_cryptohome_dir = utils.system_output(
    100             '%s system %s' % (self.cryptohome_path_command, self.fake_user))
    101 
    102         # Deduce the user cryptohome directory name for our fake user.
    103         self.user_cryptohome_dir = utils.system_output(
    104             '%s user %s' % (self.cryptohome_path_command, self.fake_user))
    105 
    106         # Deduce the directory for memory log storage.
    107         self.user_cryptohome_log_dir = ('%s/shill_logs' %
    108                                         self.root_cryptohome_dir)
    109 
    110         # The sanitized hash of the username is the basename of the cryptohome.
    111         self.fake_user_hash = os.path.basename(self.root_cryptohome_dir)
    112 
    113         # Just in case this hash actually exists, add these to the list of
    114         # saved directories.
    115         self.save_directories.append(self.root_cryptohome_dir)
    116         self.save_directories.append(self.user_cryptohome_dir)
    117 
    118         # Archive the system state we will be modifying, then remove them.
    119         utils.system('tar zcvf %s --directory / --ignore-failed-read %s'
    120                      ' 2>/dev/null' %
    121                      (self.saved_config, ' '.join(self.save_directories)))
    122         utils.system('rm -rf %s' % ' '.join(self.save_directories),
    123                      ignore_status=True)
    124 
    125         # Create the fake user's system cryptohome directory.
    126         os.mkdir(self.root_cryptohome_dir)
    127         self.new_shill_user_profile_dir = ('%s/shill' %
    128                                            self.root_cryptohome_dir)
    129         self.new_shill_user_profile = ('%s/shill.profile' %
    130                                        self.new_shill_user_profile_dir)
    131 
    132         # Create the fake user's user cryptohome directory.
    133         os.mkdir(self.user_cryptohome_dir)
    134         self.old_shill_user_profile_dir = ('%s/shill' %
    135                                            self.user_cryptohome_dir)
    136         self.old_shill_user_profile = ('%s/shill.profile' %
    137                                        self.old_shill_user_profile_dir)
    138         self.mock_flimflam = None
    139 
    140 
    141     def start_mock_flimflam(self):
    142         """ Start a mock flimflam instance to accept and log DBus calls. """
    143         self.mock_flimflam = mock_flimflam.MockFlimflam()
    144         self.mock_flimflam.start()
    145 
    146 
    147     def erase_state(self):
    148         """ Remove all the test harness files. """
    149         utils.system('rm -rf %s' % ' '.join(self.save_directories))
    150         os.mkdir(self.root_cryptohome_dir)
    151         os.mkdir(self.user_cryptohome_dir)
    152 
    153 
    154     def end_test(self):
    155         """ Perform cleanup at the end of the test. """
    156         if self.mock_flimflam:
    157             self.mock_flimflam.quit()
    158             self.mock_flimflam.join()
    159         self.erase_state()
    160         utils.system('tar zxvf %s --directory /' % self.saved_config)
    161         utils.system('rm -f %s' % self.saved_config)
    162         self.restart_system_processes()
    163 
    164 
    165     def restart_system_processes(self):
    166         """ Restart vital system services at the end of the test. """
    167         utils.start_service('shill', ignore_status=True)
    168         if self.recover_duts_stopped:
    169             utils.start_service('recover_duts', ignore_status=True)
    170 
    171 
    172     def assure(self, must_be_true, assertion_name):
    173         """ Perform a named assertion.
    174 
    175         @param must_be_true boolean parameter that must be true.
    176         @param assertion_name string name of this assertion.
    177 
    178         """
    179         if not must_be_true:
    180             raise error.TestFail('%s: Assertion failed: %s' %
    181                                  (self.test_name, assertion_name))
    182 
    183 
    184     def assure_path_owner(self, path, owner):
    185         """ Assert that |path| is owned by |owner|.
    186 
    187         @param path string pathname to test.
    188         @param owner string user name that should own |path|.
    189 
    190         """
    191         self.assure(pwd.getpwuid(os.stat(path).st_uid)[0] == owner,
    192                     'Path %s is owned by %s' % (path, owner))
    193 
    194 
    195     def assure_path_group(self, path, group):
    196         """ Assert that |path| is owned by |group|.
    197 
    198         @param path string pathname to test.
    199         @param group string group name that should own |path|.
    200 
    201         """
    202         self.assure(grp.getgrgid(os.stat(path).st_gid)[0] == group,
    203                     'Path %s is group-owned by %s' % (path, group))
    204 
    205 
    206     def assure_exists(self, path, path_friendly_name):
    207         """ Assert that |path| exists.
    208 
    209         @param path string pathname to test.
    210         @param path_friendly_name string user-parsable description of |path|.
    211 
    212         """
    213         self.assure(os.path.exists(path), '%s exists' % path_friendly_name)
    214 
    215 
    216     def assure_is_dir(self, path, path_friendly_name):
    217         """ Assert that |path| is a directory.
    218 
    219         @param path string pathname to test.
    220         @param path_friendly_name string user-parsable description of |path|.
    221 
    222         """
    223         self.assure_exists(path, path_friendly_name)
    224         self.assure(stat.S_ISDIR(os.lstat(path).st_mode),
    225                     '%s is a directory' % path_friendly_name)
    226 
    227 
    228     def assure_is_link(self, path, path_friendly_name):
    229         """ Assert that |path| is a symbolic link.
    230 
    231         @param path string pathname to test.
    232         @param path_friendly_name string user-parsable description of |path|.
    233 
    234         """
    235         self.assure_exists(path, path_friendly_name)
    236         self.assure(stat.S_ISLNK(os.lstat(path).st_mode),
    237                     '%s is a symbolic link' % path_friendly_name)
    238 
    239 
    240     def assure_is_link_to(self, path, pointee, path_friendly_name):
    241         """ Assert that |path| is a symbolic link to |pointee|.
    242 
    243         @param path string pathname to test.
    244         @param pointee string pathname that |path| should point to.
    245         @param path_friendly_name string user-parsable description of |path|.
    246 
    247         """
    248         self.assure_is_link(path, path_friendly_name)
    249         self.assure(os.readlink(path) == pointee,
    250                     '%s is a symbolic link to %s' %
    251                     (path_friendly_name, pointee))
    252 
    253 
    254     def assure_method_calls(self, expected_method_calls, assertion_name):
    255         """ Assert that |expected_method_calls| were executed on mock_flimflam.
    256 
    257         @param expected_method_calls list of string-tuple pairs of method
    258             name + tuple of arguments.
    259         @param assertion_name string name to assign to the assertion.
    260 
    261         """
    262         method_calls = self.mock_flimflam.get_method_calls()
    263         if len(expected_method_calls) != len(method_calls):
    264             self.assure(False, '%s: method call count does not match' %
    265                         assertion_name)
    266         for expected, actual in zip(expected_method_calls, method_calls):
    267             self.assure(actual.method == expected[0],
    268                         '%s: method %s matches expected %s' %
    269                         (assertion_name, actual.method, expected[0]))
    270             self.assure(actual.argument == expected[1],
    271                         '%s: argument %s matches expected %s' %
    272                         (assertion_name, actual.argument, expected[1]))
    273 
    274 
    275     def create_file_with_contents(self, filename, contents):
    276         """ Create a file named |filename| that contains |contents|.
    277 
    278         @param filename string name of file.
    279         @param contents string contents of file.
    280 
    281         """
    282         with open(filename, 'w') as f:
    283             f.write(contents)
    284 
    285 
    286     def touch(self, filename):
    287         """ Create an empty file named |filename|.
    288 
    289         @param filename string name of file.
    290 
    291         """
    292         self.create_file_with_contents(filename, '')
    293 
    294 
    295     def create_new_shill_user_profile(self, contents):
    296         """ Create a fake new user profile with |contents|.
    297 
    298         @param contents string contents of the new user profile.
    299 
    300         """
    301         os.mkdir(self.new_shill_user_profile_dir)
    302         self.create_file_with_contents(self.new_shill_user_profile, contents)
    303 
    304 
    305     def create_old_shill_user_profile(self, contents):
    306         """ Create a fake old-style user profile with |contents|.
    307 
    308         @param contents string contents of the old user profile.
    309 
    310         """
    311         os.mkdir(self.old_shill_user_profile_dir)
    312         self.create_file_with_contents(self.old_shill_user_profile, contents)
    313 
    314 
    315     def file_contents(self, filename):
    316         """ Returns the contents of |filename|.
    317 
    318         @param filename string name of file to read.
    319 
    320         """
    321         with open(filename) as f:
    322             return f.read()
    323 
    324 
    325     def find_pid(self, process_name):
    326         """ Returns the process id of |process_name|.
    327 
    328         @param process_name string name of process to search for.
    329 
    330         """
    331         return utils.system_output('pgrep %s' % process_name,
    332                                    ignore_status=True).split('\n')[0]
    333 
    334 
    335     def get_commandline(self):
    336         """ Returns the command line of the current shill executable. """
    337         pid = self.find_pid('shill')
    338         return file('/proc/%s/cmdline' % pid).read().split('\0')
    339 
    340 
    341     def run_once(self):
    342         """ Main test loop. """
    343         try:
    344             self.start_test()
    345         except:
    346             self.restart_system_processes()
    347             raise
    348 
    349         try:
    350             self.run_tests([
    351                 self.test_start_shill,
    352                 self.test_start_logged_in])
    353 
    354             # The tests above run a real instance of shill, whereas the tests
    355             # below rely on a mock instance of shill.  We must take care not
    356             # to run the mock at the same time as a real shill instance.
    357             self.start_mock_flimflam()
    358 
    359             self.run_tests([
    360                 self.test_login,
    361                 self.test_login_guest,
    362                 self.test_login_profile_exists,
    363                 self.test_login_old_shill_profile,
    364                 self.test_login_invalid_old_shill_profile,
    365                 self.test_login_ignore_old_shill_profile,
    366                 self.test_login_multi_profile,
    367                 self.test_logout])
    368         finally:
    369             # Stop any shill instances started during testing.
    370             self.stop_shill()
    371             self.end_test()
    372 
    373 
    374     def run_tests(self, tests):
    375         """ Executes each of the test subparts in sequence.
    376 
    377         @param tests list of methods to run.
    378 
    379         """
    380         for test in tests:
    381           self.test_name = test.__name__
    382           test()
    383           self.stop_shill()
    384           self.erase_state()
    385 
    386 
    387     def test_start_shill(self):
    388         """ Test all created pathnames during shill startup.
    389 
    390         Also ensure the push argument is not provided by default.
    391 
    392         """
    393         self.start_shill()
    394         self.assure_is_dir('/run/shill', 'Shill run directory')
    395         self.assure_is_dir('/var/lib/dhcpcd', 'dhcpcd lib directory')
    396         self.assure_path_owner('/var/lib/dhcpcd', 'dhcp')
    397         self.assure_path_group('/var/lib/dhcpcd', 'dhcp')
    398         self.assure_is_dir('/run/dhcpcd', 'dhcpcd run directory')
    399         self.assure_path_owner('/run/dhcpcd', 'dhcp')
    400         self.assure_path_group('/run/dhcpcd', 'dhcp')
    401         self.assure('--push=~chronos/shill' not in self.get_commandline(),
    402                     'Shill command line does not contain push argument')
    403 
    404 
    405     def test_start_logged_in(self):
    406         """ Tests starting up shill while a user is already logged in.
    407 
    408         The "--push" argument should not be added even though shill is started
    409         while a user is logged in.
    410 
    411         """
    412         os.mkdir('/run/shill')
    413         os.mkdir('/run/shill/user_profiles')
    414         self.create_new_shill_user_profile('')
    415         os.symlink(self.new_shill_user_profile_dir,
    416                    '/run/shill/user_profiles/chronos')
    417         self.touch('/run/state/logged-in')
    418         self.start_shill()
    419         self.assure('--push=~chronos/shill' not in self.get_commandline(),
    420                     'Shill command line does not contain push argument')
    421         os.unlink('/run/state/logged-in')
    422 
    423 
    424     def test_login(self):
    425         """ Test the login process.
    426 
    427         Login should create a profile directory, then create and push
    428         a user profile, given no previous state.
    429 
    430         """
    431         os.mkdir('/run/shill')
    432         self.login()
    433         self.assure(not os.path.exists(self.old_shill_user_profile),
    434                     'Old shill user profile does not exist')
    435         self.assure(not os.path.exists(self.new_shill_user_profile),
    436                     'New shill user profile does not exist')
    437         # The DBus "CreateProfile" method should have been handled
    438         # by our mock_flimflam instance, so the profile directory
    439         # should not have actually been created.
    440         self.assure_is_dir(self.new_shill_user_profile_dir,
    441                            'New shill user profile directory')
    442         self.assure_is_dir('/run/shill/user_profiles',
    443                            'Shill profile root')
    444         self.assure_is_link_to('/run/shill/user_profiles/chronos',
    445                                self.new_shill_user_profile_dir,
    446                                'Shill profile link')
    447         self.assure_is_dir(self.user_cryptohome_log_dir,
    448                            'shill user log directory')
    449         self.assure_is_link_to('/run/shill/log',
    450                                self.user_cryptohome_log_dir,
    451                                'Shill logs link')
    452         self.assure_method_calls([[ 'CreateProfile', '~chronos/shill' ],
    453                                   [ 'InsertUserProfile',
    454                                     ('~chronos/shill', self.fake_user_hash) ]],
    455                                  'CreateProfile and InsertUserProfile '
    456                                  'are called')
    457 
    458 
    459     def test_login_guest(self):
    460         """ Tests the guest login process.
    461 
    462         Login should create a temporary profile directory in /run,
    463         instead of using one within the root directory for normal users.
    464 
    465         """
    466         os.mkdir('/run/shill')
    467         self.login_guest()
    468         self.assure(not os.path.exists(self.old_shill_user_profile),
    469                     'Old shill user profile does not exist')
    470         self.assure(not os.path.exists(self.new_shill_user_profile),
    471                     'New shill user profile does not exist')
    472         self.assure(not os.path.exists(self.new_shill_user_profile_dir),
    473                     'New shill user profile directory')
    474         self.assure_is_dir(self.guest_shill_user_profile_dir,
    475                            'shill guest user profile directory')
    476         self.assure_is_dir('/run/shill/user_profiles',
    477                            'Shill profile root')
    478         self.assure_is_link_to('/run/shill/user_profiles/chronos',
    479                                self.guest_shill_user_profile_dir,
    480                                'Shill profile link')
    481         self.assure_is_dir(self.guest_shill_user_log_dir,
    482                            'shill guest user log directory')
    483         self.assure_is_link_to('/run/shill/log',
    484                                self.guest_shill_user_log_dir,
    485                                'Shill logs link')
    486         self.assure_method_calls([[ 'CreateProfile', '~chronos/shill' ],
    487                                   [ 'InsertUserProfile',
    488                                     ('~chronos/shill', '') ]],
    489                                  'CreateProfile and InsertUserProfile '
    490                                  'are called')
    491 
    492 
    493     def test_login_profile_exists(self):
    494         """ Test logging in a user whose profile already exists.
    495 
    496         Login script should only push (and not create) the user profile
    497         if a user profile already exists.
    498         """
    499         os.mkdir('/run/shill')
    500         os.mkdir(self.new_shill_user_profile_dir)
    501         self.touch(self.new_shill_user_profile)
    502         self.login()
    503         self.assure_method_calls([[ 'InsertUserProfile',
    504                                     ('~chronos/shill', self.fake_user_hash) ]],
    505                                  'Only InsertUserProfile is called')
    506 
    507 
    508     def test_login_old_shill_profile(self):
    509         """ Test logging in a user with an old-style shill profile.
    510 
    511         Login script should move an old shill user profile into place
    512         if a new one does not exist.
    513         """
    514         os.mkdir('/run/shill')
    515         self.create_old_shill_user_profile(self.magic_header)
    516         self.login()
    517         self.assure(not os.path.exists(self.old_shill_user_profile),
    518                     'Old shill user profile no longer exists')
    519         self.assure(not os.path.exists(self.old_shill_user_profile_dir),
    520                     'Old shill user profile directory no longer exists')
    521         self.assure_exists(self.new_shill_user_profile,
    522                            'New shill profile')
    523         self.assure(self.magic_header in
    524                     self.file_contents(self.new_shill_user_profile),
    525                     'Shill user profile contains our magic header')
    526         self.assure_method_calls([[ 'InsertUserProfile',
    527                                     ('~chronos/shill', self.fake_user_hash) ]],
    528                                  'Only InsertUserProfile is called')
    529 
    530 
    531     def make_symlink(self, path):
    532         """ Create a symbolic link named |path|.
    533 
    534         @param path string pathname of the symbolic link.
    535 
    536         """
    537         os.symlink('/etc/hosts', path)
    538 
    539 
    540     def make_special_file(self, path):
    541         """ Create a special file named |path|.
    542 
    543         @param path string pathname of the special file.
    544 
    545         """
    546         os.mknod(path, stat.S_IFIFO)
    547 
    548 
    549     def make_bad_owner(self, path):
    550         """ Create a regular file with a strange ownership.
    551 
    552         @param path string pathname of the file.
    553 
    554         """
    555         self.touch(path)
    556         os.lchown(path, 1000, 1000)
    557 
    558 
    559     def test_login_invalid_old_shill_profile(self):
    560         """ Test logging in with an invalid old-style shill profile.
    561 
    562         Login script should ignore non-regular files or files not owned
    563         by the correct user.  The original file should be removed.
    564 
    565         """
    566         os.mkdir('/run/shill')
    567         for file_creation_method in (self.make_symlink,
    568                                      self.make_special_file,
    569                                      os.mkdir,
    570                                      self.make_bad_owner):
    571             os.mkdir(self.old_shill_user_profile_dir)
    572             file_creation_method(self.old_shill_user_profile)
    573             self.login()
    574             self.assure(not os.path.exists(self.old_shill_user_profile),
    575                         'Old shill user profile no longer exists')
    576             self.assure(not os.path.exists(self.old_shill_user_profile_dir),
    577                         'Old shill user profile directory no longer exists')
    578             self.assure(not os.path.exists(self.new_shill_user_profile),
    579                         'New shill profile was not created')
    580             self.assure_method_calls([[ 'CreateProfile', '~chronos/shill' ],
    581                                       [ 'InsertUserProfile',
    582                                         ('~chronos/shill',
    583                                          self.fake_user_hash) ]],
    584                                      'CreateProfile and InsertUserProfile '
    585                                      'are called')
    586             os.unlink('/run/shill/user_profiles/chronos')
    587 
    588 
    589     def test_login_ignore_old_shill_profile(self):
    590         """ Test logging in with both an old and new profile present.
    591 
    592         Login script should ignore an old shill user profile if a new one
    593         exists.
    594 
    595         """
    596         os.mkdir('/run/shill')
    597         self.create_new_shill_user_profile('')
    598         self.create_old_shill_user_profile(self.magic_header)
    599         self.login()
    600         self.assure(os.path.exists(self.old_shill_user_profile),
    601                     'Old shill user profile still exists')
    602         self.assure_exists(self.new_shill_user_profile,
    603                            'New shill profile')
    604         self.assure(self.magic_header not in
    605                     self.file_contents(self.new_shill_user_profile),
    606                     'Shill user profile does not contain our magic header')
    607         self.assure_method_calls([[ 'InsertUserProfile',
    608                                     ('~chronos/shill', self.fake_user_hash) ]],
    609                                  'Only InsertUserProfile is called')
    610 
    611 
    612     def test_login_multi_profile(self):
    613         """ Test signalling shill about multiple logged-in users.
    614 
    615         Login script should not create multiple profiles in parallel
    616         if called more than once without an intervening logout.  Only
    617         the initial user profile should be created.
    618 
    619         """
    620         os.mkdir('/run/shill')
    621         self.create_new_shill_user_profile('')
    622 
    623         # First logged-in user should create a profile (tested above).
    624         self.login()
    625 
    626         # Clear the mock method-call queue.
    627         self.mock_flimflam.get_method_calls()
    628 
    629         for attempt in range(5):
    630             self.login()
    631             self.assure_method_calls([], 'No more profiles are added to shill')
    632             profile_links = os.listdir('/run/shill/user_profiles')
    633             self.assure(len(profile_links) == 1, 'Only one profile exists')
    634             self.assure(profile_links[0] == 'chronos',
    635                         'The profile link is for the chronos user')
    636             self.assure_is_link_to('/run/shill/log',
    637                                    self.user_cryptohome_log_dir,
    638                                    'Shill log link for chronos')
    639 
    640 
    641     def test_logout(self):
    642         """ Test the logout process. """
    643         os.makedirs('/run/shill/user_profiles')
    644         os.makedirs(self.guest_shill_user_profile_dir)
    645         os.makedirs(self.guest_shill_user_log_dir)
    646         self.touch('/run/state/logged-in')
    647         self.logout()
    648         self.assure(not os.path.exists('/run/shill/user_profiles'),
    649                     'User profile directory was removed')
    650         self.assure(not os.path.exists(self.guest_shill_user_profile_dir),
    651                     'Guest user profile directory was removed')
    652         self.assure(not os.path.exists(self.guest_shill_user_log_dir),
    653                     'Guest user log directory was removed')
    654         self.assure_method_calls([[ 'PopAllUserProfiles', '' ]],
    655                                  'PopAllUserProfiles is called')
    656