Home | History | Annotate | Download | only in ap_configurators
      1 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Utilities for PyAuto."""
      6 
      7 import httplib
      8 import logging
      9 import os
     10 import shutil
     11 import socket
     12 import sys
     13 import tempfile
     14 import unittest
     15 import urlparse
     16 import zipfile
     17 
     18 
     19 class ExistingPathReplacer(object):
     20   """Facilitates backing up a given path (file or dir)..
     21 
     22   Often you want to manipulate a directory or file for testing but don't want to
     23   meddle with the existing contents.  This class lets you make a backup, and
     24   reinstate the backup when done.  A backup is made in an adjacent directory,
     25   so you need to make sure you have write permissions to the parent directory.
     26 
     27   Works seemlessly in cases where the requested path already exists, or not.
     28 
     29   Automatically reinstates the backed up path (if any) when object is deleted.
     30   """
     31   _path = ''
     32   _backup_dir = None  # dir to which existing content is backed up
     33   _backup_basename = ''
     34 
     35   def __init__(self, path, path_type='dir'):
     36     """Initialize the object, making backups if necessary.
     37 
     38     Args:
     39       path: the requested path to file or directory
     40       path_type: path type. Options: 'file', 'dir'. Default: 'dir'
     41     """
     42     assert path_type in ('file', 'dir'), 'Invalid path_type: %s' % path_type
     43     self._path_type = path_type
     44     self._path = path
     45     if os.path.exists(self._path):
     46       if 'dir' == self._path_type:
     47         assert os.path.isdir(self._path), '%s is not a directory' % self._path
     48       else:
     49         assert os.path.isfile(self._path), '%s is not a file' % self._path
     50       # take a backup
     51       self._backup_basename = os.path.basename(self._path)
     52       self._backup_dir = tempfile.mkdtemp(dir=os.path.dirname(self._path),
     53                                           prefix='bkp-' + self._backup_basename)
     54       logging.info('Backing up %s in %s' % (self._path, self._backup_dir))
     55       shutil.move(self._path,
     56                   os.path.join(self._backup_dir, self._backup_basename))
     57     self._CreateRequestedPath()
     58 
     59   def __del__(self):
     60     """Cleanup. Reinstate backup."""
     61     self._CleanupRequestedPath()
     62     if self._backup_dir:  # Reinstate, if backed up.
     63       from_path = os.path.join(self._backup_dir, self._backup_basename)
     64       logging.info('Reinstating backup from %s to %s' % (from_path, self._path))
     65       shutil.move(from_path, self._path)
     66     self._RemoveBackupDir()
     67 
     68   def _CreateRequestedPath(self):
     69     # Create intermediate dirs if needed.
     70     if not os.path.exists(os.path.dirname(self._path)):
     71       os.makedirs(os.path.dirname(self._path))
     72     if 'dir' == self._path_type:
     73       os.mkdir(self._path)
     74     else:
     75       open(self._path, 'w').close()
     76 
     77   def _CleanupRequestedPath(self):
     78     if os.path.exists(self._path):
     79       if os.path.isdir(self._path):
     80         shutil.rmtree(self._path, ignore_errors=True)
     81       else:
     82         os.remove(self._path)
     83 
     84   def _RemoveBackupDir(self):
     85     if self._backup_dir and os.path.isdir(self._backup_dir):
     86       shutil.rmtree(self._backup_dir, ignore_errors=True)
     87 
     88 
     89 def RemovePath(path):
     90   """Remove the given path (file or dir)."""
     91   if os.path.isdir(path):
     92     shutil.rmtree(path, ignore_errors=True)
     93     return
     94   try:
     95     os.remove(path)
     96   except OSError:
     97     pass
     98 
     99 
    100 def UnzipFilenameToDir(filename, dir):
    101   """Unzip |filename| to directory |dir|.
    102 
    103   This works with as low as python2.4 (used on win).
    104   """
    105   zf = zipfile.ZipFile(filename)
    106   pushd = os.getcwd()
    107   if not os.path.isdir(dir):
    108     os.mkdir(dir)
    109   os.chdir(dir)
    110   # Extract files.
    111   for info in zf.infolist():
    112     name = info.filename
    113     if name.endswith('/'):  # dir
    114       if not os.path.isdir(name):
    115         os.makedirs(name)
    116     else:  # file
    117       dir = os.path.dirname(name)
    118       if not os.path.isdir(dir):
    119         os.makedirs(dir)
    120       out = open(name, 'wb')
    121       out.write(zf.read(name))
    122       out.close()
    123     # Set permissions. Permission info in external_attr is shifted 16 bits.
    124     os.chmod(name, info.external_attr >> 16L)
    125   os.chdir(pushd)
    126 
    127 
    128 def GetCurrentPlatform():
    129   """Get a string representation for the current platform.
    130 
    131   Returns:
    132     'mac', 'win' or 'linux'
    133   """
    134   if sys.platform == 'darwin':
    135     return 'mac'
    136   if sys.platform == 'win32':
    137     return 'win'
    138   if sys.platform.startswith('linux'):
    139     return 'linux'
    140   raise RuntimeError('Unknown platform')
    141 
    142 
    143 def PrintPerfResult(graph_name, series_name, data_point, units,
    144                     show_on_waterfall=False):
    145   """Prints a line to stdout that is specially formatted for the perf bots.
    146 
    147   Args:
    148     graph_name: String name for the graph on which to plot the data.
    149     series_name: String name for the series (line on the graph) associated with
    150                  the data.  This is also the string displayed on the waterfall
    151                  if |show_on_waterfall| is True.
    152     data_point: Numeric data value to plot on the graph for the current build.
    153                 This can be a single value or an array of values.  If an array,
    154                 the graph will plot the average of the values, along with error
    155                 bars.
    156     units: The string unit of measurement for the given |data_point|.
    157     show_on_waterfall: Whether or not to display this result directly on the
    158                        buildbot waterfall itself (in the buildbot step running
    159                        this test on the waterfall page, not the stdio page).
    160   """
    161   waterfall_indicator = ['', '*'][show_on_waterfall]
    162   print '%sRESULT %s: %s= %s %s' % (
    163       waterfall_indicator, graph_name, series_name,
    164       str(data_point).replace(' ', ''), units)
    165   sys.stdout.flush()
    166 
    167 
    168 def Shard(ilist, shard_index, num_shards):
    169   """Shard a given list and return the group at index |shard_index|.
    170 
    171   Args:
    172     ilist: input list
    173     shard_index: 0-based sharding index
    174     num_shards: shard count
    175   """
    176   chunk_size = len(ilist) / num_shards
    177   chunk_start = shard_index * chunk_size
    178   if shard_index == num_shards - 1:  # Exhaust the remainder in the last shard.
    179     chunk_end = len(ilist)
    180   else:
    181     chunk_end = chunk_start + chunk_size
    182   return ilist[chunk_start:chunk_end]
    183 
    184 
    185 def WaitForDomElement(pyauto, driver, xpath):
    186   """Wait for the UI element to appear.
    187 
    188   Args:
    189     pyauto: an instance of pyauto.PyUITest.
    190     driver: an instance of chrome driver or a web element.
    191     xpath: the xpath of the element to wait for.
    192 
    193   Returns:
    194     The element if it is found.
    195     NoSuchElementException if it is not found.
    196   """
    197   pyauto.WaitUntil(lambda: len(driver.find_elements_by_xpath(xpath)) > 0)
    198   return driver.find_element_by_xpath(xpath)
    199 
    200 
    201 def DoesUrlExist(url):
    202   """Determines whether a resource exists at the given URL.
    203 
    204   Args:
    205     url: URL to be verified.
    206 
    207   Returns:
    208     True if url exists, otherwise False.
    209   """
    210   parsed = urlparse.urlparse(url)
    211   try:
    212     conn = httplib.HTTPConnection(parsed.netloc)
    213     conn.request('HEAD', parsed.path)
    214     response = conn.getresponse()
    215   except (socket.gaierror, socket.error):
    216     return False
    217   finally:
    218     conn.close()
    219   # Follow both permanent (301) and temporary (302) redirects.
    220   if response.status == 302 or response.status == 301:
    221     return DoesUrlExist(response.getheader('location'))
    222   return response.status == 200
    223 
    224 
    225 class _GTestTextTestResult(unittest._TextTestResult):
    226   """A test result class that can print formatted text results to a stream.
    227 
    228   Results printed in conformance with gtest output format, like:
    229   [ RUN        ] autofill.AutofillTest.testAutofillInvalid: "test desc."
    230   [         OK ] autofill.AutofillTest.testAutofillInvalid
    231   [ RUN        ] autofill.AutofillTest.testFillProfile: "test desc."
    232   [         OK ] autofill.AutofillTest.testFillProfile
    233   [ RUN        ] autofill.AutofillTest.testFillProfileCrazyCharacters: "Test."
    234   [         OK ] autofill.AutofillTest.testFillProfileCrazyCharacters
    235   """
    236 
    237   def __init__(self, stream, descriptions, verbosity):
    238     unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
    239 
    240   def _GetTestURI(self, test):
    241     if sys.version_info[:2] <= (2, 4):
    242       return '%s.%s' % (unittest._strclass(test.__class__),
    243                         test._TestCase__testMethodName)
    244     return '%s.%s' % (unittest._strclass(test.__class__), test._testMethodName)
    245 
    246   def getDescription(self, test):
    247     return '%s: "%s"' % (self._GetTestURI(test), test.shortDescription())
    248 
    249   def startTest(self, test):
    250     unittest.TestResult.startTest(self, test)
    251     self.stream.writeln('[ RUN        ] %s' % self.getDescription(test))
    252 
    253   def addSuccess(self, test):
    254     unittest.TestResult.addSuccess(self, test)
    255     self.stream.writeln('[         OK ] %s' % self._GetTestURI(test))
    256 
    257   def addError(self, test, err):
    258     unittest.TestResult.addError(self, test, err)
    259     self.stream.writeln('[      ERROR ] %s' % self._GetTestURI(test))
    260 
    261   def addFailure(self, test, err):
    262     unittest.TestResult.addFailure(self, test, err)
    263     self.stream.writeln('[     FAILED ] %s' % self._GetTestURI(test))
    264 
    265 
    266 class GTestTextTestRunner(unittest.TextTestRunner):
    267   """Test Runner for displaying test results in textual format.
    268 
    269   Results are displayed in conformance with gtest output.
    270   """
    271 
    272   def __init__(self, verbosity=1):
    273     unittest.TextTestRunner.__init__(self, stream=sys.stderr,
    274                                      verbosity=verbosity)
    275 
    276   def _makeResult(self):
    277     return _GTestTextTestResult(self.stream, self.descriptions, self.verbosity)
    278