Home | History | Annotate | Download | only in server2
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import posixpath
      6 
      7 from file_system import FileSystem, FileNotFoundError
      8 from future import Future
      9 from test_file_system import _List, _StatTracker, TestFileSystem
     10 from path_util import IsDirectory
     11 
     12 
     13 class MockFileSystem(FileSystem):
     14   '''Wraps FileSystems to add a selection of mock behaviour:
     15   - asserting how often Stat/Read calls are being made to it.
     16   - primitive changes/versioning via applying object "diffs", mapping paths to
     17     new content (similar to how TestFileSystem works).
     18   '''
     19   def __init__(self, file_system):
     20     self._file_system = file_system
     21     # Updates are stored as TestFileSystems because it already implements a
     22     # bunch of logic to intepret paths into dictionaries.
     23     self._updates = []
     24     self._stat_tracker = _StatTracker()
     25     self._read_count = 0
     26     self._read_resolve_count = 0
     27     self._stat_count = 0
     28 
     29   @staticmethod
     30   def Create(file_system, updates):
     31     mock_file_system = MockFileSystem(file_system)
     32     for update in updates:
     33       mock_file_system.Update(update)
     34     return mock_file_system
     35 
     36   #
     37   # FileSystem implementation.
     38   #
     39 
     40   def Read(self, paths, skip_not_found=False):
     41     '''Reads |paths| from |_file_system|, then applies the most recent update
     42     from |_updates|, if any.
     43     '''
     44     self._read_count += 1
     45     def next(result):
     46       self._read_resolve_count += 1
     47       for path in result.iterkeys():
     48         update = self._GetMostRecentUpdate(path)
     49         if update is not None:
     50           result[path] = update
     51       return result
     52     return self._file_system.Read(paths,
     53                                   skip_not_found=skip_not_found).Then(next)
     54 
     55   def Refresh(self):
     56     return self._file_system.Refresh()
     57 
     58   def _GetMostRecentUpdate(self, path):
     59     '''Returns the latest update for the file at |path|, or None if |path|
     60     has never been updated.
     61     '''
     62     for update in reversed(self._updates):
     63       try:
     64         return update.ReadSingle(path).Get()
     65       except FileNotFoundError:
     66         pass
     67     return None
     68 
     69   def Stat(self, path):
     70     self._stat_count += 1
     71 
     72     # This only supports numeric stat values since we need to add to it.  In
     73     # reality the logic here could just be to randomly mutate the stat values
     74     # every time there's an Update but that's less meaningful for testing.
     75     def stradd(a, b):
     76       return str(int(a) + b)
     77 
     78     stat = self._file_system.Stat(path)
     79     stat.version = stradd(stat.version, self._stat_tracker.GetVersion(path))
     80     if stat.child_versions:
     81       for child_path, child_version in stat.child_versions.iteritems():
     82         stat.child_versions[child_path] = stradd(
     83             stat.child_versions[child_path],
     84             self._stat_tracker.GetVersion(posixpath.join(path, child_path)))
     85 
     86     return stat
     87 
     88   def GetCommitID(self):
     89     return Future(value=self._stat_tracker.GetVersion(''))
     90 
     91   def GetPreviousCommitID(self):
     92     return Future(value=self._stat_tracker.GetVersion('') - 1)
     93 
     94   def GetIdentity(self):
     95     return self._file_system.GetIdentity()
     96 
     97   def __str__(self):
     98     return repr(self)
     99 
    100   def __repr__(self):
    101     return 'MockFileSystem(read_count=%s, stat_count=%s, updates=%s)' % (
    102         self._read_count, self._stat_count, len(self._updates))
    103 
    104   #
    105   # Testing methods.
    106   #
    107 
    108   def GetStatCount(self):
    109     return self._stat_count
    110 
    111   def CheckAndReset(self, stat_count=0, read_count=0, read_resolve_count=0):
    112     '''Returns a tuple (success, error). Use in tests like:
    113     self.assertTrue(*object_store.CheckAndReset(...))
    114     '''
    115     errors = []
    116     for desc, expected, actual in (
    117         ('read_count', read_count, self._read_count),
    118         ('read_resolve_count', read_resolve_count, self._read_resolve_count),
    119         ('stat_count', stat_count, self._stat_count)):
    120       if actual != expected:
    121         errors.append('%s: expected %s got %s' % (desc, expected, actual))
    122     try:
    123       return (len(errors) == 0, ', '.join(errors))
    124     finally:
    125       self.Reset()
    126 
    127   def Reset(self):
    128     self._read_count = 0
    129     self._read_resolve_count = 0
    130     self._stat_count = 0
    131 
    132   def Update(self, update):
    133     self._updates.append(TestFileSystem(update))
    134     for path in _List(update).iterkeys():
    135       # Any files (not directories) which changed are now at the version
    136       # derived from |_updates|.
    137       if not IsDirectory(path):
    138         self._stat_tracker.SetVersion(path, len(self._updates))
    139