1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """Utilities for PyAuto.""" 6 7 import httplib 8 import logging 9 import os 10 import shutil 11 import socket 12 import sys 13 import tempfile 14 import unittest 15 import urlparse 16 import zipfile 17 18 19 class ExistingPathReplacer(object): 20 """Facilitates backing up a given path (file or dir).. 21 22 Often you want to manipulate a directory or file for testing but don't want to 23 meddle with the existing contents. This class lets you make a backup, and 24 reinstate the backup when done. A backup is made in an adjacent directory, 25 so you need to make sure you have write permissions to the parent directory. 26 27 Works seemlessly in cases where the requested path already exists, or not. 28 29 Automatically reinstates the backed up path (if any) when object is deleted. 30 """ 31 _path = '' 32 _backup_dir = None # dir to which existing content is backed up 33 _backup_basename = '' 34 35 def __init__(self, path, path_type='dir'): 36 """Initialize the object, making backups if necessary. 37 38 Args: 39 path: the requested path to file or directory 40 path_type: path type. Options: 'file', 'dir'. Default: 'dir' 41 """ 42 assert path_type in ('file', 'dir'), 'Invalid path_type: %s' % path_type 43 self._path_type = path_type 44 self._path = path 45 if os.path.exists(self._path): 46 if 'dir' == self._path_type: 47 assert os.path.isdir(self._path), '%s is not a directory' % self._path 48 else: 49 assert os.path.isfile(self._path), '%s is not a file' % self._path 50 # take a backup 51 self._backup_basename = os.path.basename(self._path) 52 self._backup_dir = tempfile.mkdtemp(dir=os.path.dirname(self._path), 53 prefix='bkp-' + self._backup_basename) 54 logging.info('Backing up %s in %s' % (self._path, self._backup_dir)) 55 shutil.move(self._path, 56 os.path.join(self._backup_dir, self._backup_basename)) 57 self._CreateRequestedPath() 58 59 def __del__(self): 60 """Cleanup. Reinstate backup.""" 61 self._CleanupRequestedPath() 62 if self._backup_dir: # Reinstate, if backed up. 63 from_path = os.path.join(self._backup_dir, self._backup_basename) 64 logging.info('Reinstating backup from %s to %s' % (from_path, self._path)) 65 shutil.move(from_path, self._path) 66 self._RemoveBackupDir() 67 68 def _CreateRequestedPath(self): 69 # Create intermediate dirs if needed. 70 if not os.path.exists(os.path.dirname(self._path)): 71 os.makedirs(os.path.dirname(self._path)) 72 if 'dir' == self._path_type: 73 os.mkdir(self._path) 74 else: 75 open(self._path, 'w').close() 76 77 def _CleanupRequestedPath(self): 78 if os.path.exists(self._path): 79 if os.path.isdir(self._path): 80 shutil.rmtree(self._path, ignore_errors=True) 81 else: 82 os.remove(self._path) 83 84 def _RemoveBackupDir(self): 85 if self._backup_dir and os.path.isdir(self._backup_dir): 86 shutil.rmtree(self._backup_dir, ignore_errors=True) 87 88 89 def RemovePath(path): 90 """Remove the given path (file or dir).""" 91 if os.path.isdir(path): 92 shutil.rmtree(path, ignore_errors=True) 93 return 94 try: 95 os.remove(path) 96 except OSError: 97 pass 98 99 100 def UnzipFilenameToDir(filename, dir): 101 """Unzip |filename| to directory |dir|. 102 103 This works with as low as python2.4 (used on win). 104 """ 105 zf = zipfile.ZipFile(filename) 106 pushd = os.getcwd() 107 if not os.path.isdir(dir): 108 os.mkdir(dir) 109 os.chdir(dir) 110 # Extract files. 111 for info in zf.infolist(): 112 name = info.filename 113 if name.endswith('/'): # dir 114 if not os.path.isdir(name): 115 os.makedirs(name) 116 else: # file 117 dir = os.path.dirname(name) 118 if not os.path.isdir(dir): 119 os.makedirs(dir) 120 out = open(name, 'wb') 121 out.write(zf.read(name)) 122 out.close() 123 # Set permissions. Permission info in external_attr is shifted 16 bits. 124 os.chmod(name, info.external_attr >> 16L) 125 os.chdir(pushd) 126 127 128 def GetCurrentPlatform(): 129 """Get a string representation for the current platform. 130 131 Returns: 132 'mac', 'win' or 'linux' 133 """ 134 if sys.platform == 'darwin': 135 return 'mac' 136 if sys.platform == 'win32': 137 return 'win' 138 if sys.platform.startswith('linux'): 139 return 'linux' 140 raise RuntimeError('Unknown platform') 141 142 143 def PrintPerfResult(graph_name, series_name, data_point, units, 144 show_on_waterfall=False): 145 """Prints a line to stdout that is specially formatted for the perf bots. 146 147 Args: 148 graph_name: String name for the graph on which to plot the data. 149 series_name: String name for the series (line on the graph) associated with 150 the data. This is also the string displayed on the waterfall 151 if |show_on_waterfall| is True. 152 data_point: Numeric data value to plot on the graph for the current build. 153 This can be a single value or an array of values. If an array, 154 the graph will plot the average of the values, along with error 155 bars. 156 units: The string unit of measurement for the given |data_point|. 157 show_on_waterfall: Whether or not to display this result directly on the 158 buildbot waterfall itself (in the buildbot step running 159 this test on the waterfall page, not the stdio page). 160 """ 161 waterfall_indicator = ['', '*'][show_on_waterfall] 162 print '%sRESULT %s: %s= %s %s' % ( 163 waterfall_indicator, graph_name, series_name, 164 str(data_point).replace(' ', ''), units) 165 sys.stdout.flush() 166 167 168 def Shard(ilist, shard_index, num_shards): 169 """Shard a given list and return the group at index |shard_index|. 170 171 Args: 172 ilist: input list 173 shard_index: 0-based sharding index 174 num_shards: shard count 175 """ 176 chunk_size = len(ilist) / num_shards 177 chunk_start = shard_index * chunk_size 178 if shard_index == num_shards - 1: # Exhaust the remainder in the last shard. 179 chunk_end = len(ilist) 180 else: 181 chunk_end = chunk_start + chunk_size 182 return ilist[chunk_start:chunk_end] 183 184 185 def WaitForDomElement(pyauto, driver, xpath): 186 """Wait for the UI element to appear. 187 188 Args: 189 pyauto: an instance of pyauto.PyUITest. 190 driver: an instance of chrome driver or a web element. 191 xpath: the xpath of the element to wait for. 192 193 Returns: 194 The element if it is found. 195 NoSuchElementException if it is not found. 196 """ 197 pyauto.WaitUntil(lambda: len(driver.find_elements_by_xpath(xpath)) > 0) 198 return driver.find_element_by_xpath(xpath) 199 200 201 def DoesUrlExist(url): 202 """Determines whether a resource exists at the given URL. 203 204 Args: 205 url: URL to be verified. 206 207 Returns: 208 True if url exists, otherwise False. 209 """ 210 parsed = urlparse.urlparse(url) 211 try: 212 conn = httplib.HTTPConnection(parsed.netloc) 213 conn.request('HEAD', parsed.path) 214 response = conn.getresponse() 215 except (socket.gaierror, socket.error): 216 return False 217 finally: 218 conn.close() 219 # Follow both permanent (301) and temporary (302) redirects. 220 if response.status == 302 or response.status == 301: 221 return DoesUrlExist(response.getheader('location')) 222 return response.status == 200 223 224 225 class _GTestTextTestResult(unittest._TextTestResult): 226 """A test result class that can print formatted text results to a stream. 227 228 Results printed in conformance with gtest output format, like: 229 [ RUN ] autofill.AutofillTest.testAutofillInvalid: "test desc." 230 [ OK ] autofill.AutofillTest.testAutofillInvalid 231 [ RUN ] autofill.AutofillTest.testFillProfile: "test desc." 232 [ OK ] autofill.AutofillTest.testFillProfile 233 [ RUN ] autofill.AutofillTest.testFillProfileCrazyCharacters: "Test." 234 [ OK ] autofill.AutofillTest.testFillProfileCrazyCharacters 235 """ 236 237 def __init__(self, stream, descriptions, verbosity): 238 unittest._TextTestResult.__init__(self, stream, descriptions, verbosity) 239 240 def _GetTestURI(self, test): 241 if sys.version_info[:2] <= (2, 4): 242 return '%s.%s' % (unittest._strclass(test.__class__), 243 test._TestCase__testMethodName) 244 return '%s.%s' % (unittest._strclass(test.__class__), test._testMethodName) 245 246 def getDescription(self, test): 247 return '%s: "%s"' % (self._GetTestURI(test), test.shortDescription()) 248 249 def startTest(self, test): 250 unittest.TestResult.startTest(self, test) 251 self.stream.writeln('[ RUN ] %s' % self.getDescription(test)) 252 253 def addSuccess(self, test): 254 unittest.TestResult.addSuccess(self, test) 255 self.stream.writeln('[ OK ] %s' % self._GetTestURI(test)) 256 257 def addError(self, test, err): 258 unittest.TestResult.addError(self, test, err) 259 self.stream.writeln('[ ERROR ] %s' % self._GetTestURI(test)) 260 261 def addFailure(self, test, err): 262 unittest.TestResult.addFailure(self, test, err) 263 self.stream.writeln('[ FAILED ] %s' % self._GetTestURI(test)) 264 265 266 class GTestTextTestRunner(unittest.TextTestRunner): 267 """Test Runner for displaying test results in textual format. 268 269 Results are displayed in conformance with gtest output. 270 """ 271 272 def __init__(self, verbosity=1): 273 unittest.TextTestRunner.__init__(self, stream=sys.stderr, 274 verbosity=verbosity) 275 276 def _makeResult(self): 277 return _GTestTextTestResult(self.stream, self.descriptions, self.verbosity) 278