Home | History | Annotate | Download | only in value
      1 # Copyright 2014 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import os
      6 import shutil
      7 import tempfile
      8 import unittest
      9 
     10 from telemetry import story
     11 from telemetry import page as page_module
     12 from telemetry.testing import system_stub
     13 from telemetry.timeline import trace_data
     14 from telemetry.value import trace
     15 
     16 
     17 class TestBase(unittest.TestCase):
     18 
     19   def setUp(self):
     20     story_set = story.StorySet(base_dir=os.path.dirname(__file__))
     21     story_set.AddStory(
     22         page_module.Page('http://www.bar.com/', story_set, story_set.base_dir))
     23     story_set.AddStory(
     24         page_module.Page('http://www.baz.com/', story_set, story_set.base_dir))
     25     story_set.AddStory(
     26         page_module.Page('http://www.foo.com/', story_set, story_set.base_dir))
     27     self.story_set = story_set
     28 
     29     self._cloud_storage_stub = system_stub.Override(trace, ['cloud_storage'])
     30 
     31   def tearDown(self):
     32     if self._cloud_storage_stub:
     33       self._cloud_storage_stub.Restore()
     34       self._cloud_storage_stub = None
     35 
     36   @property
     37   def pages(self):
     38     return self.story_set.stories
     39 
     40 
     41 class TestSet(object):
     42   """ A test set that represents a set that contains any key. """
     43 
     44   def __contains__(self, key):
     45     return True
     46 
     47 
     48 class TestDefaultDict(object):
     49   """ A test default dict that represents a dictionary that contains any key
     50   with value |default_value|. """
     51 
     52   def __init__(self, default_value):
     53     self._default_value = default_value
     54     self._test_set = TestSet()
     55 
     56   def __contains__(self, key):
     57     return key in self._test_set
     58 
     59   def __getitem__(self, key):
     60     return self._default_value
     61 
     62   def keys(self):
     63     return self._test_set
     64 
     65 
     66 class ValueTest(TestBase):
     67   def testRepr(self):
     68     v = trace.TraceValue(self.pages[0], trace_data.TraceData({'test': 1}),
     69                          important=True, description='desc')
     70 
     71     self.assertEquals('TraceValue(http://www.bar.com/, trace)', str(v))
     72 
     73   def testAsDictWhenTraceSerializedAndUploaded(self):
     74     tempdir = tempfile.mkdtemp()
     75     try:
     76       v = trace.TraceValue(None, trace_data.TraceData({'test': 1}))
     77       fh = v.Serialize(tempdir)
     78       trace.cloud_storage.SetCalculatedHashesForTesting(
     79           {fh.GetAbsPath(): 123})
     80       bucket = trace.cloud_storage.PUBLIC_BUCKET
     81       cloud_url = v.UploadToCloud(bucket)
     82       d = v.AsDict()
     83       self.assertEqual(d['file_id'], fh.id)
     84       self.assertEqual(d['cloud_url'], cloud_url)
     85     finally:
     86       shutil.rmtree(tempdir)
     87 
     88   def testAsDictWhenTraceIsNotSerializedAndUploaded(self):
     89     test_temp_file = tempfile.NamedTemporaryFile(delete=False)
     90     try:
     91       v = trace.TraceValue(None, trace_data.TraceData({'test': 1}))
     92       trace.cloud_storage.SetCalculatedHashesForTesting(
     93           TestDefaultDict(123))
     94       bucket = trace.cloud_storage.PUBLIC_BUCKET
     95       cloud_url = v.UploadToCloud(bucket)
     96       d = v.AsDict()
     97       self.assertEqual(d['cloud_url'], cloud_url)
     98     finally:
     99       if os.path.exists(test_temp_file.name):
    100         test_temp_file.close()
    101         os.remove(test_temp_file.name)
    102 
    103 
    104 def _IsEmptyDir(path):
    105   return os.path.exists(path) and not os.listdir(path)
    106 
    107 
    108 class NoLeakedTempfilesTests(TestBase):
    109 
    110   def setUp(self):
    111     super(NoLeakedTempfilesTests, self).setUp()
    112     self.temp_test_dir = tempfile.mkdtemp()
    113     self.actual_tempdir = trace.tempfile.tempdir
    114     trace.tempfile.tempdir = self.temp_test_dir
    115 
    116   def testNoLeakedTempFileOnImplicitCleanUp(self):
    117     with trace.TraceValue(None, trace_data.TraceData({'test': 1})):
    118       pass
    119     self.assertTrue(_IsEmptyDir(self.temp_test_dir))
    120 
    121   def testNoLeakedTempFileWhenUploadingTrace(self):
    122     v = trace.TraceValue(None, trace_data.TraceData({'test': 1}))
    123     v.CleanUp()
    124     self.assertTrue(_IsEmptyDir(self.temp_test_dir))
    125 
    126   def tearDown(self):
    127     super(NoLeakedTempfilesTests, self).tearDown()
    128     shutil.rmtree(self.temp_test_dir)
    129     trace.tempfile.tempdir = self.actual_tempdir
    130