1 # Copyright 2014 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import os 6 import shutil 7 import tempfile 8 import unittest 9 10 from telemetry import story 11 from telemetry import page as page_module 12 from telemetry.testing import system_stub 13 from telemetry.timeline import trace_data 14 from telemetry.value import trace 15 16 17 class TestBase(unittest.TestCase): 18 19 def setUp(self): 20 story_set = story.StorySet(base_dir=os.path.dirname(__file__)) 21 story_set.AddStory( 22 page_module.Page('http://www.bar.com/', story_set, story_set.base_dir)) 23 story_set.AddStory( 24 page_module.Page('http://www.baz.com/', story_set, story_set.base_dir)) 25 story_set.AddStory( 26 page_module.Page('http://www.foo.com/', story_set, story_set.base_dir)) 27 self.story_set = story_set 28 29 self._cloud_storage_stub = system_stub.Override(trace, ['cloud_storage']) 30 31 def tearDown(self): 32 if self._cloud_storage_stub: 33 self._cloud_storage_stub.Restore() 34 self._cloud_storage_stub = None 35 36 @property 37 def pages(self): 38 return self.story_set.stories 39 40 41 class TestSet(object): 42 """ A test set that represents a set that contains any key. """ 43 44 def __contains__(self, key): 45 return True 46 47 48 class TestDefaultDict(object): 49 """ A test default dict that represents a dictionary that contains any key 50 with value |default_value|. """ 51 52 def __init__(self, default_value): 53 self._default_value = default_value 54 self._test_set = TestSet() 55 56 def __contains__(self, key): 57 return key in self._test_set 58 59 def __getitem__(self, key): 60 return self._default_value 61 62 def keys(self): 63 return self._test_set 64 65 66 class ValueTest(TestBase): 67 def testRepr(self): 68 v = trace.TraceValue(self.pages[0], trace_data.TraceData({'test': 1}), 69 important=True, description='desc') 70 71 self.assertEquals('TraceValue(http://www.bar.com/, trace)', str(v)) 72 73 def testAsDictWhenTraceSerializedAndUploaded(self): 74 tempdir = tempfile.mkdtemp() 75 try: 76 v = trace.TraceValue(None, trace_data.TraceData({'test': 1})) 77 fh = v.Serialize(tempdir) 78 trace.cloud_storage.SetCalculatedHashesForTesting( 79 {fh.GetAbsPath(): 123}) 80 bucket = trace.cloud_storage.PUBLIC_BUCKET 81 cloud_url = v.UploadToCloud(bucket) 82 d = v.AsDict() 83 self.assertEqual(d['file_id'], fh.id) 84 self.assertEqual(d['cloud_url'], cloud_url) 85 finally: 86 shutil.rmtree(tempdir) 87 88 def testAsDictWhenTraceIsNotSerializedAndUploaded(self): 89 test_temp_file = tempfile.NamedTemporaryFile(delete=False) 90 try: 91 v = trace.TraceValue(None, trace_data.TraceData({'test': 1})) 92 trace.cloud_storage.SetCalculatedHashesForTesting( 93 TestDefaultDict(123)) 94 bucket = trace.cloud_storage.PUBLIC_BUCKET 95 cloud_url = v.UploadToCloud(bucket) 96 d = v.AsDict() 97 self.assertEqual(d['cloud_url'], cloud_url) 98 finally: 99 if os.path.exists(test_temp_file.name): 100 test_temp_file.close() 101 os.remove(test_temp_file.name) 102 103 104 def _IsEmptyDir(path): 105 return os.path.exists(path) and not os.listdir(path) 106 107 108 class NoLeakedTempfilesTests(TestBase): 109 110 def setUp(self): 111 super(NoLeakedTempfilesTests, self).setUp() 112 self.temp_test_dir = tempfile.mkdtemp() 113 self.actual_tempdir = trace.tempfile.tempdir 114 trace.tempfile.tempdir = self.temp_test_dir 115 116 def testNoLeakedTempFileOnImplicitCleanUp(self): 117 with trace.TraceValue(None, trace_data.TraceData({'test': 1})): 118 pass 119 self.assertTrue(_IsEmptyDir(self.temp_test_dir)) 120 121 def testNoLeakedTempFileWhenUploadingTrace(self): 122 v = trace.TraceValue(None, trace_data.TraceData({'test': 1})) 123 v.CleanUp() 124 self.assertTrue(_IsEmptyDir(self.temp_test_dir)) 125 126 def tearDown(self): 127 super(NoLeakedTempfilesTests, self).tearDown() 128 shutil.rmtree(self.temp_test_dir) 129 trace.tempfile.tempdir = self.actual_tempdir 130