1 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import posixpath 6 7 from file_system import FileSystem, FileNotFoundError 8 from future import Future 9 from test_file_system import _List, _StatTracker, TestFileSystem 10 from path_util import IsDirectory 11 12 13 class MockFileSystem(FileSystem): 14 '''Wraps FileSystems to add a selection of mock behaviour: 15 - asserting how often Stat/Read calls are being made to it. 16 - primitive changes/versioning via applying object "diffs", mapping paths to 17 new content (similar to how TestFileSystem works). 18 ''' 19 def __init__(self, file_system): 20 self._file_system = file_system 21 # Updates are stored as TestFileSystems because it already implements a 22 # bunch of logic to intepret paths into dictionaries. 23 self._updates = [] 24 self._stat_tracker = _StatTracker() 25 self._read_count = 0 26 self._read_resolve_count = 0 27 self._stat_count = 0 28 29 @staticmethod 30 def Create(file_system, updates): 31 mock_file_system = MockFileSystem(file_system) 32 for update in updates: 33 mock_file_system.Update(update) 34 return mock_file_system 35 36 # 37 # FileSystem implementation. 38 # 39 40 def Read(self, paths, skip_not_found=False): 41 '''Reads |paths| from |_file_system|, then applies the most recent update 42 from |_updates|, if any. 43 ''' 44 self._read_count += 1 45 def next(result): 46 self._read_resolve_count += 1 47 for path in result.iterkeys(): 48 update = self._GetMostRecentUpdate(path) 49 if update is not None: 50 result[path] = update 51 return result 52 return self._file_system.Read(paths, 53 skip_not_found=skip_not_found).Then(next) 54 55 def Refresh(self): 56 return self._file_system.Refresh() 57 58 def _GetMostRecentUpdate(self, path): 59 '''Returns the latest update for the file at |path|, or None if |path| 60 has never been updated. 61 ''' 62 for update in reversed(self._updates): 63 try: 64 return update.ReadSingle(path).Get() 65 except FileNotFoundError: 66 pass 67 return None 68 69 def Stat(self, path): 70 self._stat_count += 1 71 72 # This only supports numeric stat values since we need to add to it. In 73 # reality the logic here could just be to randomly mutate the stat values 74 # every time there's an Update but that's less meaningful for testing. 75 def stradd(a, b): 76 return str(int(a) + b) 77 78 stat = self._file_system.Stat(path) 79 stat.version = stradd(stat.version, self._stat_tracker.GetVersion(path)) 80 if stat.child_versions: 81 for child_path, child_version in stat.child_versions.iteritems(): 82 stat.child_versions[child_path] = stradd( 83 stat.child_versions[child_path], 84 self._stat_tracker.GetVersion(posixpath.join(path, child_path))) 85 86 return stat 87 88 def GetCommitID(self): 89 return Future(value=self._stat_tracker.GetVersion('')) 90 91 def GetPreviousCommitID(self): 92 return Future(value=self._stat_tracker.GetVersion('') - 1) 93 94 def GetIdentity(self): 95 return self._file_system.GetIdentity() 96 97 def __str__(self): 98 return repr(self) 99 100 def __repr__(self): 101 return 'MockFileSystem(read_count=%s, stat_count=%s, updates=%s)' % ( 102 self._read_count, self._stat_count, len(self._updates)) 103 104 # 105 # Testing methods. 106 # 107 108 def GetStatCount(self): 109 return self._stat_count 110 111 def CheckAndReset(self, stat_count=0, read_count=0, read_resolve_count=0): 112 '''Returns a tuple (success, error). Use in tests like: 113 self.assertTrue(*object_store.CheckAndReset(...)) 114 ''' 115 errors = [] 116 for desc, expected, actual in ( 117 ('read_count', read_count, self._read_count), 118 ('read_resolve_count', read_resolve_count, self._read_resolve_count), 119 ('stat_count', stat_count, self._stat_count)): 120 if actual != expected: 121 errors.append('%s: expected %s got %s' % (desc, expected, actual)) 122 try: 123 return (len(errors) == 0, ', '.join(errors)) 124 finally: 125 self.Reset() 126 127 def Reset(self): 128 self._read_count = 0 129 self._read_resolve_count = 0 130 self._stat_count = 0 131 132 def Update(self, update): 133 self._updates.append(TestFileSystem(update)) 134 for path in _List(update).iterkeys(): 135 # Any files (not directories) which changed are now at the version 136 # derived from |_updates|. 137 if not IsDirectory(path): 138 self._stat_tracker.SetVersion(path, len(self._updates)) 139