1 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 from file_system import FileSystem, FileNotFoundError, StatInfo 6 from future import Future 7 from path_util import AssertIsValid, AssertIsDirectory, IsDirectory 8 9 10 def MoveTo(base, obj): 11 '''Returns an object as |obj| moved to |base|. That is, 12 MoveTo('foo/bar', {'a': 'b'}) -> {'foo': {'bar': {'a': 'b'}}} 13 ''' 14 AssertIsDirectory(base) 15 result = {} 16 leaf = result 17 for k in base.rstrip('/').split('/'): 18 leaf[k] = {} 19 leaf = leaf[k] 20 leaf.update(obj) 21 return result 22 23 24 def MoveAllTo(base, obj): 25 '''Moves every value in |obj| to |base|. See MoveTo. 26 ''' 27 result = {} 28 for key, value in obj.iteritems(): 29 result[key] = MoveTo(base, value) 30 return result 31 32 33 def _List(file_system): 34 '''Returns a list of '/' separated paths derived from |file_system|. 35 For example, {'index.html': '', 'www': {'file.txt': ''}} would return 36 ['index.html', 'www/file.txt']. 37 ''' 38 assert isinstance(file_system, dict) 39 result = {} 40 def update_result(item, path): 41 AssertIsValid(path) 42 if isinstance(item, dict): 43 if path != '': 44 path += '/' 45 result[path] = [p if isinstance(content, basestring) else (p + '/') 46 for p, content in item.iteritems()] 47 for subpath, subitem in item.iteritems(): 48 update_result(subitem, path + subpath) 49 elif isinstance(item, basestring): 50 result[path] = item 51 else: 52 raise ValueError('Unsupported item type: %s' % type(item)) 53 update_result(file_system, '') 54 return result 55 56 57 class _StatTracker(object): 58 '''Maintains the versions of paths in a file system. The versions of files 59 are changed either by |Increment| or |SetVersion|. The versions of 60 directories are derived from the versions of files within it. 61 ''' 62 63 def __init__(self): 64 self._path_stats = {} 65 self._global_stat = 0 66 67 def Increment(self, path=None, by=1): 68 if path is None: 69 self._global_stat += by 70 else: 71 self.SetVersion(path, self._path_stats.get(path, 0) + by) 72 73 def SetVersion(self, path, new_version): 74 if IsDirectory(path): 75 raise ValueError('Only files have an incrementable stat, ' 76 'but "%s" is a directory' % path) 77 78 # Update version of that file. 79 self._path_stats[path] = new_version 80 81 # Update all parent directory versions as well. 82 slash_index = 0 # (deliberately including '' in the dir paths) 83 while slash_index != -1: 84 dir_path = path[:slash_index] + '/' 85 self._path_stats[dir_path] = max(self._path_stats.get(dir_path, 0), 86 new_version) 87 if dir_path == '/': 88 # Legacy support for '/' being the root of the file system rather 89 # than ''. Eventually when the path normalisation logic is complete 90 # this will be impossible and this logic will change slightly. 91 self._path_stats[''] = self._path_stats['/'] 92 slash_index = path.find('/', slash_index + 1) 93 94 def GetVersion(self, path): 95 return self._global_stat + self._path_stats.get(path, 0) 96 97 98 class TestFileSystem(FileSystem): 99 '''A FileSystem backed by an object. Create with an object representing file 100 paths such that {'a': {'b': 'hello'}} will resolve Read('a/b') as 'hello', 101 Read('a/') as ['b'], and Stat determined by a value incremented via 102 IncrementStat. 103 ''' 104 105 def __init__(self, obj, relative_to=None, identity=None): 106 assert obj is not None 107 if relative_to is not None: 108 obj = MoveTo(relative_to, obj) 109 self._identity = identity or type(self).__name__ 110 self._path_values = _List(obj) 111 self._stat_tracker = _StatTracker() 112 113 # 114 # FileSystem implementation. 115 # 116 117 def Read(self, paths, skip_not_found=False): 118 for path in paths: 119 if path not in self._path_values: 120 if skip_not_found: continue 121 return FileNotFoundError.RaiseInFuture( 122 '%s not in %s' % (path, '\n'.join(self._path_values))) 123 return Future(value=dict((k, v) for k, v in self._path_values.iteritems() 124 if k in paths)) 125 126 def Refresh(self): 127 return Future(value=()) 128 129 def Stat(self, path): 130 read_result = self.ReadSingle(path).Get() 131 stat_result = StatInfo(str(self._stat_tracker.GetVersion(path))) 132 if isinstance(read_result, list): 133 stat_result.child_versions = dict( 134 (file_result, 135 str(self._stat_tracker.GetVersion('%s%s' % (path, file_result)))) 136 for file_result in read_result) 137 return stat_result 138 139 # 140 # Testing methods. 141 # 142 143 def IncrementStat(self, path=None, by=1): 144 self._stat_tracker.Increment(path, by=by) 145 146 def GetIdentity(self): 147 return self._identity 148