1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import posixpath 6 import sys 7 8 from file_system import FileSystem, StatInfo, FileNotFoundError 9 from future import All, Future 10 from path_util import AssertIsDirectory, IsDirectory, ToDirectory 11 from third_party.json_schema_compiler.memoize import memoize 12 13 14 class CachingFileSystem(FileSystem): 15 '''FileSystem which implements a caching layer on top of |file_system|. It's 16 smart, using Stat() to decided whether to skip Read()ing from |file_system|, 17 and only Stat()ing directories never files. 18 ''' 19 def __init__(self, file_system, object_store_creator): 20 self._file_system = file_system 21 def create_object_store(category, **optargs): 22 return object_store_creator.Create( 23 CachingFileSystem, 24 category='%s/%s' % (file_system.GetIdentity(), category), 25 **optargs) 26 self._stat_cache = create_object_store('stat') 27 # The read caches can start populated (start_empty=False) because file 28 # updates are picked up by the stat, so it doesn't need the force-refresh 29 # which starting empty is designed for. Without this optimisation, cron 30 # runs are extra slow. 31 self._read_cache = create_object_store('read', start_empty=False) 32 self._walk_cache = create_object_store('walk', start_empty=False) 33 34 def Refresh(self): 35 return self._file_system.Refresh() 36 37 def StatAsync(self, path): 38 '''Stats the directory given, or if a file is given, stats the file's parent 39 directory to get info about the file. 40 ''' 41 # Always stat the parent directory, since it will have the stat of the child 42 # anyway, and this gives us an entire directory's stat info at once. 43 dir_path, file_path = posixpath.split(path) 44 dir_path = ToDirectory(dir_path) 45 46 def make_stat_info(dir_stat): 47 '''Converts a dir stat into the correct resulting StatInfo; if the Stat 48 was for a file, the StatInfo should just contain that file. 49 ''' 50 if path == dir_path: 51 return dir_stat 52 # Was a file stat. Extract that file. 53 file_version = dir_stat.child_versions.get(file_path) 54 if file_version is None: 55 raise FileNotFoundError('No stat found for %s in %s (found %s)' % 56 (path, dir_path, dir_stat.child_versions)) 57 return StatInfo(file_version) 58 59 dir_stat = self._stat_cache.Get(dir_path).Get() 60 if dir_stat is not None: 61 return Future(callback=lambda: make_stat_info(dir_stat)) 62 63 def next(dir_stat): 64 assert dir_stat is not None # should have raised a FileNotFoundError 65 # We only ever need to cache the dir stat. 66 self._stat_cache.Set(dir_path, dir_stat) 67 return make_stat_info(dir_stat) 68 return self._MemoizedStatAsyncFromFileSystem(dir_path).Then(next) 69 70 @memoize 71 def _MemoizedStatAsyncFromFileSystem(self, dir_path): 72 '''This is a simple wrapper to memoize Futures to directory stats, since 73 StatAsync makes heavy use of it. Only cache directories so that the 74 memoized cache doesn't blow up. 75 ''' 76 assert IsDirectory(dir_path) 77 return self._file_system.StatAsync(dir_path) 78 79 def Read(self, paths, skip_not_found=False): 80 '''Reads a list of files. If a file is cached and it is not out of 81 date, it is returned. Otherwise, the file is retrieved from the file system. 82 ''' 83 # Files which aren't found are cached in the read object store as 84 # (path, None, None). This is to prevent re-reads of files we know 85 # do not exist. 86 cached_read_values = self._read_cache.GetMulti(paths).Get() 87 cached_stat_values = self._stat_cache.GetMulti(paths).Get() 88 89 # Populate a map of paths to Futures to their stat. They may have already 90 # been cached in which case their Future will already have been constructed 91 # with a value. 92 stat_futures = {} 93 94 def handle(error): 95 if isinstance(error, FileNotFoundError): 96 return None 97 raise error 98 99 for path in paths: 100 stat_value = cached_stat_values.get(path) 101 if stat_value is None: 102 stat_future = self.StatAsync(path) 103 if skip_not_found: 104 stat_future = stat_future.Then(lambda x: x, handle) 105 else: 106 stat_future = Future(value=stat_value) 107 stat_futures[path] = stat_future 108 109 # Filter only the cached data which is up to date by comparing to the latest 110 # stat. The cached read data includes the cached version. Remove it for 111 # the result returned to callers. |version| == None implies a non-existent 112 # file, so skip it. 113 up_to_date_data = dict( 114 (path, data) for path, (data, version) in cached_read_values.iteritems() 115 if version is not None and stat_futures[path].Get().version == version) 116 117 if skip_not_found: 118 # Filter out paths which we know do not exist, i.e. if |path| is in 119 # |cached_read_values| *and* has a None version, then it doesn't exist. 120 # See the above declaration of |cached_read_values| for more information. 121 paths = [path for path in paths 122 if cached_read_values.get(path, (None, True))[1]] 123 124 if len(up_to_date_data) == len(paths): 125 # Everything was cached and up-to-date. 126 return Future(value=up_to_date_data) 127 128 def next(new_results): 129 # Update the cache. This is a path -> (data, version) mapping. 130 self._read_cache.SetMulti( 131 dict((path, (new_result, stat_futures[path].Get().version)) 132 for path, new_result in new_results.iteritems())) 133 # Update the read cache to include files that weren't found, to prevent 134 # constantly trying to read a file we now know doesn't exist. 135 self._read_cache.SetMulti( 136 dict((path, (None, None)) for path in paths 137 if stat_futures[path].Get() is None)) 138 new_results.update(up_to_date_data) 139 return new_results 140 # Read in the values that were uncached or old. 141 return self._file_system.Read(set(paths) - set(up_to_date_data.iterkeys()), 142 skip_not_found=skip_not_found).Then(next) 143 144 def GetCommitID(self): 145 return self._file_system.GetCommitID() 146 147 def GetPreviousCommitID(self): 148 return self._file_system.GetPreviousCommitID() 149 150 def Walk(self, root, depth=-1): 151 '''Overrides FileSystem.Walk() to provide caching functionality. 152 ''' 153 def file_lister(root): 154 res, root_stat = All((self._walk_cache.Get(root), 155 self.StatAsync(root))).Get() 156 157 if res and res[2] == root_stat.version: 158 dirs, files = res[0], res[1] 159 else: 160 # Wasn't cached, or not up to date. 161 dirs, files = [], [] 162 for f in self.ReadSingle(root).Get(): 163 if IsDirectory(f): 164 dirs.append(f) 165 else: 166 files.append(f) 167 # Update the cache. This is a root -> (dirs, files, version) mapping. 168 self._walk_cache.Set(root, (dirs, files, root_stat.version)) 169 return dirs, files 170 return self._file_system.Walk(root, depth=depth, file_lister=file_lister) 171 172 def GetCommitID(self): 173 return self._file_system.GetCommitID() 174 175 def GetPreviousCommitID(self): 176 return self._file_system.GetPreviousCommitID() 177 178 def GetIdentity(self): 179 return self._file_system.GetIdentity() 180 181 def __repr__(self): 182 return '%s of <%s>' % (type(self).__name__, repr(self._file_system)) 183