1 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 from file_system import FileSystem, FileNotFoundError 6 from future import Future 7 from test_file_system import TestFileSystem 8 9 class MockFileSystem(FileSystem): 10 '''Wraps FileSystems to add a selection of mock behaviour: 11 12 - asserting how often Stat/Read calls are being made to it. 13 - primitive changes/versioning via applying object "diffs", mapping paths to 14 new content (similar to how TestFileSystem works). 15 ''' 16 def __init__(self, file_system): 17 self._file_system = file_system 18 # Updates are modelled are stored as TestFileSystems because they've 19 # implemented a bunch of logic to interpret paths into dictionaries. 20 self._updates = [] 21 self._read_count = 0 22 self._stat_count = 0 23 24 @staticmethod 25 def Create(file_system, updates): 26 mock_file_system = MockFileSystem(file_system) 27 for update in updates: 28 mock_file_system.Update(update) 29 return mock_file_system 30 31 # 32 # FileSystem implementation. 33 # 34 35 def Read(self, paths, binary=False): 36 '''Reads |paths| from |_file_system|, then applies the most recent update 37 from |_updates|, if any. 38 ''' 39 self._read_count += 1 40 future_result = self._file_system.Read(paths, binary=binary) 41 try: 42 result = future_result.Get() 43 except: 44 return future_result 45 for path in result.iterkeys(): 46 _, update = self._GetMostRecentUpdate(path) 47 if update is not None: 48 result[path] = update 49 return Future(value=result) 50 51 def _GetMostRecentUpdate(self, path): 52 for revision, update in reversed(list(enumerate(self._updates))): 53 try: 54 return (revision + 1, update.ReadSingle(path)) 55 except FileNotFoundError: 56 pass 57 return (0, None) 58 59 def Stat(self, path): 60 self._stat_count += 1 61 return self._StatImpl(path) 62 63 def _StatImpl(self, path): 64 result = self._file_system.Stat(path) 65 result.version = self._UpdateStat(result.version, path) 66 child_versions = result.child_versions 67 if child_versions is not None: 68 for child_path in child_versions.iterkeys(): 69 child_versions[child_path] = self._UpdateStat( 70 child_versions[child_path], 71 '%s%s' % (path, child_path)) 72 return result 73 74 def _UpdateStat(self, version, path): 75 if not path.endswith('/'): 76 return str(int(version) + self._GetMostRecentUpdate(path)[0]) 77 # Bleh, it's a directory, need to recursively search all the children. 78 child_paths = self._file_system.ReadSingle(path) 79 if not child_paths: 80 return version 81 return str(max([int(version)] + 82 [int(self._StatImpl('%s%s' % (path, child_path)).version) 83 for child_path in child_paths])) 84 85 def GetIdentity(self): 86 return self._file_system.GetIdentity() 87 88 def __str__(self): 89 return repr(self) 90 91 def __repr__(self): 92 return 'MockFileSystem(read_count=%s, stat_count=%s, updates=%s)' % ( 93 self._read_count, self._stat_count, len(self._updates)) 94 95 # 96 # Testing methods. 97 # 98 99 def GetReadCount(self): 100 return self._read_count 101 102 def GetStatCount(self): 103 return self._stat_count 104 105 def CheckAndReset(self, stat_count=0, read_count=0): 106 '''Returns a tuple (success, error). Use in tests like: 107 self.assertTrue(*object_store.CheckAndReset(...)) 108 ''' 109 errors = [] 110 for desc, expected, actual in ( 111 ('read_count', read_count, self._read_count), 112 ('stat_count', stat_count, self._stat_count)): 113 if actual != expected: 114 errors.append('%s: expected %s got %s' % (desc, expected, actual)) 115 try: 116 return (len(errors) == 0, ', '.join(errors)) 117 finally: 118 self.Reset() 119 120 def Reset(self): 121 self._read_count = 0 122 self._stat_count = 0 123 124 def Update(self, update): 125 self._updates.append(TestFileSystem(update)) 126