Home | History | Annotate | Download | only in server2
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 from file_system import FileSystem, FileNotFoundError
      6 from future import Future
      7 from test_file_system import TestFileSystem
      8 
      9 class MockFileSystem(FileSystem):
     10   '''Wraps FileSystems to add a selection of mock behaviour:
     11 
     12   - asserting how often Stat/Read calls are being made to it.
     13   - primitive changes/versioning via applying object "diffs", mapping paths to
     14     new content (similar to how TestFileSystem works).
     15   '''
     16   def __init__(self, file_system):
     17     self._file_system = file_system
     18     # Updates are modelled are stored as TestFileSystems because they've
     19     # implemented a bunch of logic to interpret paths into dictionaries.
     20     self._updates = []
     21     self._read_count = 0
     22     self._stat_count = 0
     23 
     24   @staticmethod
     25   def Create(file_system, updates):
     26     mock_file_system = MockFileSystem(file_system)
     27     for update in updates:
     28       mock_file_system.Update(update)
     29     return mock_file_system
     30 
     31   #
     32   # FileSystem implementation.
     33   #
     34 
     35   def Read(self, paths, binary=False):
     36     '''Reads |paths| from |_file_system|, then applies the most recent update
     37     from |_updates|, if any.
     38     '''
     39     self._read_count += 1
     40     future_result = self._file_system.Read(paths, binary=binary)
     41     try:
     42       result = future_result.Get()
     43     except:
     44       return future_result
     45     for path in result.iterkeys():
     46       _, update = self._GetMostRecentUpdate(path)
     47       if update is not None:
     48         result[path] = update
     49     return Future(value=result)
     50 
     51   def _GetMostRecentUpdate(self, path):
     52     for revision, update in reversed(list(enumerate(self._updates))):
     53       try:
     54         return (revision + 1, update.ReadSingle(path))
     55       except FileNotFoundError:
     56         pass
     57     return (0, None)
     58 
     59   def Stat(self, path):
     60     self._stat_count += 1
     61     return self._StatImpl(path)
     62 
     63   def _StatImpl(self, path):
     64     result = self._file_system.Stat(path)
     65     result.version = self._UpdateStat(result.version, path)
     66     child_versions = result.child_versions
     67     if child_versions is not None:
     68       for child_path in child_versions.iterkeys():
     69         child_versions[child_path] = self._UpdateStat(
     70             child_versions[child_path],
     71             '%s%s' % (path, child_path))
     72     return result
     73 
     74   def _UpdateStat(self, version, path):
     75     if not path.endswith('/'):
     76       return str(int(version) + self._GetMostRecentUpdate(path)[0])
     77     # Bleh, it's a directory, need to recursively search all the children.
     78     child_paths = self._file_system.ReadSingle(path)
     79     if not child_paths:
     80       return version
     81     return str(max([int(version)] +
     82                    [int(self._StatImpl('%s%s' % (path, child_path)).version)
     83                     for child_path in child_paths]))
     84 
     85   def GetIdentity(self):
     86     return self._file_system.GetIdentity()
     87 
     88   def __str__(self):
     89     return repr(self)
     90 
     91   def __repr__(self):
     92     return 'MockFileSystem(read_count=%s, stat_count=%s, updates=%s)' % (
     93         self._read_count, self._stat_count, len(self._updates))
     94 
     95   #
     96   # Testing methods.
     97   #
     98 
     99   def GetReadCount(self):
    100     return self._read_count
    101 
    102   def GetStatCount(self):
    103     return self._stat_count
    104 
    105   def CheckAndReset(self, stat_count=0, read_count=0):
    106     '''Returns a tuple (success, error). Use in tests like:
    107     self.assertTrue(*object_store.CheckAndReset(...))
    108     '''
    109     errors = []
    110     for desc, expected, actual in (
    111         ('read_count', read_count, self._read_count),
    112         ('stat_count', stat_count, self._stat_count)):
    113       if actual != expected:
    114         errors.append('%s: expected %s got %s' % (desc, expected, actual))
    115     try:
    116       return (len(errors) == 0, ', '.join(errors))
    117     finally:
    118       self.Reset()
    119 
    120   def Reset(self):
    121     self._read_count = 0
    122     self._stat_count = 0
    123 
    124   def Update(self, update):
    125     self._updates.append(TestFileSystem(update))
    126