Home | History | Annotate | Download | only in cros
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 import re
      8 import shutil
      9 import subprocess
     10 import tempfile
     11 import time
     12 import urllib
     13 import urllib2
     14 
     15 from autotest_lib.client.bin import test
     16 from autotest_lib.client.bin import utils
     17 from autotest_lib.client.common_lib import error
     18 from autotest_lib.client.common_lib import file_utils
     19 from autotest_lib.client.cros.input_playback import input_playback
     20 
     21 
     22 class touch_playback_test_base(test.test):
     23     """Base class for touch tests involving playback."""
     24     version = 1
     25 
     26     _INPUTCONTROL = '/opt/google/input/inputcontrol'
     27 
     28 
     29     @property
     30     def _has_touchpad(self):
     31         """True if device under test has a touchpad; else False."""
     32         return self.player.has('touchpad')
     33 
     34 
     35     @property
     36     def _has_touchscreen(self):
     37         """True if device under test has a touchscreen; else False."""
     38         return self.player.has('touchscreen')
     39 
     40 
     41     @property
     42     def _has_mouse(self):
     43         """True if device under test has or emulates a USB mouse; else False."""
     44         return self.player.has('mouse')
     45 
     46 
     47     def warmup(self, mouse_props=None):
     48         """Test setup.
     49 
     50         Instantiate player object to find touch devices, if any.
     51         These devices can be used for playback later.
     52         Emulate a USB mouse if a property file is provided.
     53         Check if the inputcontrol script is avaiable on the disk.
     54 
     55         @param mouse_props: optional property file for a mouse to emulate.
     56                             Created using 'evemu-describe /dev/input/X'.
     57 
     58         """
     59         self.player = input_playback.InputPlayback()
     60         if mouse_props:
     61             self.player.emulate(input_type='mouse', property_file=mouse_props)
     62         self.player.find_connected_inputs()
     63 
     64         self._autotest_ext = None
     65         self._has_inputcontrol = os.path.isfile(self._INPUTCONTROL)
     66         self._platform = utils.get_board()
     67         if 'cheets' in self._platform:
     68             self._platform = self._platform[:-len('-cheets')]
     69 
     70 
     71     def _find_test_files(self, input_type, gestures):
     72         """Determine where the playback gesture files for this test are.
     73 
     74         Expected file format is: <boardname>_<input type>_<hwid>_<gesture name>
     75             e.g. samus_touchpad_164.17_scroll_down
     76 
     77         @param input_type: device type, e.g. 'touchpad'
     78         @param gestures: list of gesture name strings used in filename
     79 
     80         @returns: None if not all files are found.  Dictionary of filepaths if
     81                   they are found, indexed by gesture names as given.
     82         @raises: error.TestError if no device is found or if device should have
     83                  a hw_id but does not.
     84 
     85         """
     86         if type(gestures) is not list:
     87             raise error.TestError('find_test_files() takes a LIST, not a '
     88                                    '%s!' % type(gestures))
     89 
     90         if not self.player.has(input_type):
     91             raise error.TestError('Device does not have a %s!' % input_type)
     92 
     93         if input_type in ['touchpad', 'touchscreen', 'stylus']:
     94             hw_id = self.player.devices[input_type].hw_id
     95             if not hw_id:
     96                 raise error.TestError('No valid hw_id for %s!' % input_type)
     97             filename_fmt = '%s_%s_%s' % (self._platform, input_type, hw_id)
     98 
     99         else:
    100             device_name = self.player.devices[input_type].name
    101             filename_fmt = '%s_%s' % (device_name, input_type)
    102 
    103         filepaths = {}
    104         for gesture in gestures:
    105             filename = '%s_%s' % (filename_fmt, gesture)
    106             filepath = self._download_remote_test_file(filename, input_type)
    107             if not filepath:
    108                 logging.info('Did not find files for this device!')
    109                 return None
    110 
    111             filepaths[gesture] = filepath
    112 
    113         return filepaths
    114 
    115 
    116     def _find_test_files_from_directions(self, input_type, fmt_str, directions):
    117         """Find gesture files given a list of directions and name format.
    118 
    119         @param input_type: device type, e.g. 'touchpad'
    120         @param fmt_str: format string for filename, e.g. 'scroll-%s'
    121         @param directions: list of directions for fmt_string
    122 
    123         @returns: None if not all files are found.  Dictionary of filepaths if
    124                   they are found, indexed by directions as given.
    125         @raises: error.TestError if no hw_id is found.
    126 
    127         """
    128         gestures = [fmt_str % d for d in directions]
    129         temp_filepaths = self._find_test_files(input_type, gestures)
    130 
    131         filepaths = {}
    132         if temp_filepaths:
    133             filepaths = {d: temp_filepaths[fmt_str % d] for d in directions}
    134 
    135         return filepaths
    136 
    137 
    138     def _download_remote_test_file(self, filename, input_type):
    139         """Download a file from the remote touch playback folder.
    140 
    141         @param filename: string of filename
    142         @param input_type: device type, e.g. 'touchpad'
    143 
    144         @returns: Path to local file or None if file is not found.
    145 
    146         """
    147         REMOTE_STORAGE_URL = ('https://storage.googleapis.com/'
    148                               'chromiumos-test-assets-public/touch_playback')
    149         filename = urllib.quote(filename)
    150 
    151         if input_type in ['touchpad', 'touchscreen', 'stylus']:
    152             url = '%s/%s/%s' % (REMOTE_STORAGE_URL, self._platform, filename)
    153         else:
    154             url = '%s/TYPE-%s/%s' % (REMOTE_STORAGE_URL, input_type, filename)
    155         local_file = os.path.join(self.bindir, filename)
    156 
    157         logging.info('Looking for %s', url)
    158         try:
    159             file_utils.download_file(url, local_file)
    160         except urllib2.URLError as e:
    161             logging.info('File download failed!')
    162             logging.debug(e.msg)
    163             return None
    164 
    165         return local_file
    166 
    167 
    168     def _emulate_mouse(self, property_file=None):
    169         """Emulate a mouse with the given property file.
    170 
    171         player will use default mouse if no file is provided.
    172 
    173         """
    174         self.player.emulate(input_type='mouse', property_file=property_file)
    175         self.player.find_connected_inputs()
    176         if not self._has_mouse:
    177             raise error.TestError('Mouse emulation failed!')
    178 
    179     def _playback(self, filepath, touch_type='touchpad'):
    180         """Playback a given input file on the given input."""
    181         self.player.playback(filepath, touch_type)
    182 
    183 
    184     def _blocking_playback(self, filepath, touch_type='touchpad'):
    185         """Playback a given input file on the given input; block until done."""
    186         self.player.blocking_playback(filepath, touch_type)
    187 
    188 
    189     def _set_touch_setting_by_inputcontrol(self, setting, value):
    190         """Set a given touch setting the given value by inputcontrol.
    191 
    192         @param setting: Name of touch setting, e.g. 'tapclick'.
    193         @param value: True for enabled, False for disabled.
    194 
    195         """
    196         cmd_value = 1 if value else 0
    197         utils.run('%s --%s %d' % (self._INPUTCONTROL, setting, cmd_value))
    198         logging.info('%s turned %s.', setting, 'on' if value else 'off')
    199 
    200 
    201     def _set_touch_setting(self, inputcontrol_setting, autotest_ext_setting,
    202                            value):
    203         """Set a given touch setting the given value.
    204 
    205         @param inputcontrol_setting: Name of touch setting for the inputcontrol
    206                                      script, e.g. 'tapclick'.
    207         @param autotest_ext_setting: Name of touch setting for the autotest
    208                                      extension, e.g. 'TapToClick'.
    209         @param value: True for enabled, False for disabled.
    210 
    211         """
    212         if self._has_inputcontrol:
    213             self._set_touch_setting_by_inputcontrol(inputcontrol_setting, value)
    214         elif self._autotest_ext is not None:
    215             self._autotest_ext.EvaluateJavaScript(
    216                     'chrome.autotestPrivate.set%s(%s);'
    217                     % (autotest_ext_setting, ("%s" % value).lower()))
    218             # TODO: remove this sleep once checking for value is available.
    219             time.sleep(1)
    220         else:
    221             raise error.TestFail('Both inputcontrol and the autotest '
    222                                  'extension are not availble.')
    223 
    224 
    225     def _set_australian_scrolling(self, value):
    226         """Set australian scrolling to the given value.
    227 
    228         @param value: True for enabled, False for disabled.
    229 
    230         """
    231         self._set_touch_setting('australian_scrolling', 'NaturalScroll', value)
    232 
    233 
    234     def _set_tap_to_click(self, value):
    235         """Set tap-to-click to the given value.
    236 
    237         @param value: True for enabled, False for disabled.
    238 
    239         """
    240         self._set_touch_setting('tapclick', 'TapToClick', value)
    241 
    242 
    243     def _set_tap_dragging(self, value):
    244         """Set tap dragging to the given value.
    245 
    246         @param value: True for enabled, False for disabled.
    247 
    248         """
    249         self._set_touch_setting('tapdrag', 'TapDragging', value)
    250 
    251 
    252     def _set_autotest_ext(self, ext):
    253         """Set the autotest extension.
    254 
    255         @ext: the autotest extension object.
    256 
    257         """
    258         self._autotest_ext = ext
    259 
    260 
    261     def _open_test_page(self, cr, filename='test_page.html'):
    262         """Prepare test page for testing.  Set self._tab with page.
    263 
    264         @param cr: chrome.Chrome() object
    265         @param filename: name of file in self.bindir to open
    266 
    267         """
    268         self._test_page = TestPage(cr, self.bindir, filename)
    269         self._tab = self._test_page._tab
    270 
    271 
    272     def _open_events_page(self, cr):
    273         """Open the test events page.  Set self._events with EventsPage class.
    274 
    275         Also set self._tab as this page and self.bindir as the http server dir.
    276 
    277         @param cr: chrome.Chrome() object
    278 
    279         """
    280         self._events = EventsPage(cr, self.bindir)
    281         self._tab = self._events._tab
    282 
    283 
    284     def _center_cursor(self):
    285         """Playback mouse movement to center cursor.
    286 
    287         Requres that self._emulate_mouse() has been called.
    288 
    289         """
    290         self.player.blocking_playback_of_default_file(
    291                 'mouse_center_cursor_gesture', input_type='mouse')
    292 
    293 
    294     def _get_kernel_events_recorder(self, input_type):
    295         """Return a kernel event recording object for the given input type.
    296 
    297         @param input_type: device type, e.g. 'touchpad'
    298 
    299         @returns: KernelEventsRecorder instance.
    300 
    301         """
    302         node = self.player.devices[input_type].node
    303         return KernelEventsRecorder(node)
    304 
    305 
    306     def cleanup(self):
    307         """ clean up """
    308         self.player.close()
    309 
    310 
    311 class KernelEventsRecorder(object):
    312     """Object to record kernel events for a particular device."""
    313 
    314     def __init__(self, node):
    315         """Setup to record future evtest output for this node.
    316 
    317         @param input_type: the device which to inspect, e.g. 'mouse'
    318 
    319         """
    320         self.node = node
    321         self.fh = tempfile.NamedTemporaryFile()
    322         self.evtest_process = None
    323 
    324 
    325     def start(self):
    326         """Start recording events."""
    327         self.evtest_process = subprocess.Popen(
    328                 ['evtest', self.node], stdout=self.fh)
    329 
    330         # Wait until the initial output has finished before returning.
    331         def find_exit():
    332             """Polling function for end of output."""
    333             interrupt_cmd = ('grep "interrupt to exit" %s | wc -l' %
    334                              self.fh.name)
    335             line_count = utils.run(interrupt_cmd).stdout.strip()
    336             return line_count != '0'
    337         utils.poll_for_condition(find_exit)
    338 
    339 
    340     def clear(self):
    341         """Clear previous events."""
    342         self.stop()
    343         self.fh.close()
    344         self.fh = tempfile.NamedTemporaryFile()
    345 
    346 
    347     def stop(self):
    348         """Stop recording events."""
    349         if self.evtest_process:
    350             self.evtest_process.kill()
    351             self.evtest_process = None
    352 
    353 
    354     def get_recorded_events(self):
    355         """Get the evtest output since object was created."""
    356         self.fh.seek(0)
    357         events = self.fh.read()
    358         return events
    359 
    360 
    361     def log_recorded_events(self):
    362         """Save recorded events into logs."""
    363         events = self.get_recorded_events()
    364         logging.info('Kernel events seen:\n%s', events)
    365 
    366 
    367     def get_last_event_timestamp(self, filter_str=''):
    368         """Return the timestamp of the last event since recording started.
    369 
    370         Events are in the form "Event: time <epoch time>, <info>\n"
    371 
    372         @param filter_str: a regex string to match to the <info> section.
    373 
    374         @returns: floats matching
    375 
    376         """
    377         events = self.get_recorded_events()
    378         findall = re.findall(r' time (.*?), [^\n]*?%s' % filter_str,
    379                              events, re.MULTILINE)
    380         re.findall(r' time (.*?), [^\n]*?%s' % filter_str, events, re.MULTILINE)
    381         if not findall:
    382             self.log_recorded_events()
    383             raise error.TestError('Could not find any kernel timestamps!'
    384                                   '  Filter: %s' % filter_str)
    385         return float(findall[-1])
    386 
    387 
    388     def close(self):
    389         """Clean up this class."""
    390         self.stop()
    391         self.fh.close()
    392 
    393 
    394 class TestPage(object):
    395     """Wrapper around a Telemtry tab for utility functions.
    396 
    397     Provides functions such as reload and setting scroll height on page.
    398 
    399     """
    400     _DEFAULT_SCROLL = 5000
    401 
    402     def __init__(self, cr, httpdir, filename):
    403         """Open a given test page in the given httpdir.
    404 
    405         @param cr: chrome.Chrome() object
    406         @param httpdir: the directory to use for SetHTTPServerDirectories
    407         @param filename: path to the file to open, relative to httpdir
    408 
    409         """
    410         cr.browser.platform.SetHTTPServerDirectories(httpdir)
    411         self._tab = cr.browser.tabs[0]
    412         self._tab.Navigate(cr.browser.platform.http_server.UrlOf(
    413                 os.path.join(httpdir, filename)))
    414         self.wait_for_page_ready()
    415 
    416 
    417     def reload_page(self):
    418         """Reloads test page."""
    419         self._tab.Navigate(self._tab.url)
    420         self.wait_for_page_ready()
    421 
    422 
    423     def wait_for_page_ready(self):
    424         """Wait for a variable pageReady on the test page to be true.
    425 
    426         Presuposes that a pageReady variable exists on this page.
    427 
    428         @raises error.TestError if page is not ready after timeout.
    429 
    430         """
    431         self._tab.WaitForDocumentReadyStateToBeComplete()
    432         utils.poll_for_condition(
    433                 lambda: self._tab.EvaluateJavaScript('pageReady'),
    434                 exception=error.TestError('Test page is not ready!'))
    435 
    436 
    437     def expand_page(self):
    438         """Expand the page to be very large, to allow scrolling."""
    439         page_width = self._DEFAULT_SCROLL * 5
    440         cmd = 'document.body.style.%s = "%dpx"' % ('%s', page_width)
    441         self._tab.ExecuteJavaScript(cmd % 'width')
    442         self._tab.ExecuteJavaScript(cmd % 'height')
    443 
    444 
    445     def set_scroll_position(self, value, scroll_vertical=True):
    446         """Set scroll position to given value.
    447 
    448         @param value: integer value in pixels.
    449         @param scroll_vertical: True for vertical scroll,
    450                                 False for horizontal Scroll.
    451 
    452         """
    453         cmd = 'window.scrollTo(%d, %d);'
    454         if scroll_vertical:
    455             self._tab.ExecuteJavaScript(cmd % (0, value))
    456         else:
    457             self._tab.ExecuteJavaScript(cmd % (value, 0))
    458 
    459 
    460     def set_default_scroll_position(self, scroll_vertical=True):
    461         """Set scroll position of page to default.
    462 
    463         @param scroll_vertical: True for vertical scroll,
    464                                 False for horizontal Scroll.
    465         @raise: TestError if page is not set to default scroll position
    466 
    467         """
    468         total_tries = 2
    469         for i in xrange(total_tries):
    470             try:
    471                 self.set_scroll_position(self._DEFAULT_SCROLL, scroll_vertical)
    472                 self.wait_for_default_scroll_position(scroll_vertical)
    473             except error.TestError as e:
    474                 if i == total_tries - 1:
    475                    pos = self.get_scroll_position(scroll_vertical)
    476                    logging.error('SCROLL POSITION: %s', pos)
    477                    raise e
    478                 else:
    479                    self.expand_page()
    480             else:
    481                  break
    482 
    483 
    484     def get_scroll_position(self, scroll_vertical=True):
    485         """Return current scroll position of page.
    486 
    487         @param scroll_vertical: True for vertical scroll,
    488                                 False for horizontal Scroll.
    489 
    490         """
    491         if scroll_vertical:
    492             return int(self._tab.EvaluateJavaScript('window.scrollY'))
    493         else:
    494             return int(self._tab.EvaluateJavaScript('window.scrollX'))
    495 
    496 
    497     def wait_for_default_scroll_position(self, scroll_vertical=True):
    498         """Wait for page to be at the default scroll position.
    499 
    500         @param scroll_vertical: True for vertical scroll,
    501                                 False for horizontal scroll.
    502 
    503         @raise: TestError if page either does not move or does not stop moving.
    504 
    505         """
    506         utils.poll_for_condition(
    507                 lambda: self.get_scroll_position(
    508                         scroll_vertical) == self._DEFAULT_SCROLL,
    509                 exception=error.TestError('Page not set to default scroll!'))
    510 
    511 
    512     def wait_for_scroll_position_to_settle(self, scroll_vertical=True):
    513         """Wait for page to move and then stop moving.
    514 
    515         @param scroll_vertical: True for Vertical scroll and
    516                                 False for horizontal scroll.
    517 
    518         @raise: TestError if page either does not move or does not stop moving.
    519 
    520         """
    521         # Wait until page starts moving.
    522         utils.poll_for_condition(
    523                 lambda: self.get_scroll_position(
    524                         scroll_vertical) != self._DEFAULT_SCROLL,
    525                 exception=error.TestError('No scrolling occurred!'), timeout=30)
    526 
    527         # Wait until page has stopped moving.
    528         self._previous = self._DEFAULT_SCROLL
    529         def _movement_stopped():
    530             current = self.get_scroll_position()
    531             result = current == self._previous
    532             self._previous = current
    533             return result
    534 
    535         utils.poll_for_condition(
    536                 lambda: _movement_stopped(), sleep_interval=1,
    537                 exception=error.TestError('Page did not stop moving!'),
    538                 timeout=30)
    539 
    540 
    541     def get_page_zoom(self):
    542         """Return window.innerWidth for this page."""
    543         return float(self._tab.EvaluateJavaScript(
    544                             'window.visualViewport.scale'))
    545 
    546 
    547 class EventsPage(TestPage):
    548     """Functions to monitor input events on the DUT, as seen by a webpage.
    549 
    550     A subclass of TestPage which uses and interacts with a specific page.
    551 
    552     """
    553     def __init__(self, cr, httpdir):
    554         """Open the website and save the tab in self._tab.
    555 
    556         @param cr: chrome.Chrome() object
    557         @param httpdir: the directory to use for SetHTTPServerDirectories
    558 
    559         """
    560         filename = 'touch_events_test_page.html'
    561         current_dir = os.path.dirname(os.path.realpath(__file__))
    562         shutil.copyfile(os.path.join(current_dir, filename),
    563                         os.path.join(httpdir, filename))
    564 
    565         super(EventsPage, self).__init__(cr, httpdir, filename)
    566 
    567 
    568     def clear_previous_events(self):
    569         """Wipe the test page back to its original state."""
    570         self._tab.ExecuteJavaScript('pageReady = false')
    571         self._tab.ExecuteJavaScript('clearPreviousEvents()')
    572         self.wait_for_page_ready()
    573 
    574 
    575     def get_events_log(self):
    576         """Return the event log from the test page."""
    577         return self._tab.EvaluateJavaScript('eventLog')
    578 
    579 
    580     def log_events(self):
    581         """Put the test page's event log into logging.info."""
    582         logging.info('EVENTS LOG:')
    583         logging.info(self.get_events_log())
    584 
    585 
    586     def get_time_of_last_event(self):
    587         """Return the timestamp of the last seen event (if any)."""
    588         return self._tab.EvaluateJavaScript('timeOfLastEvent')
    589 
    590 
    591     def get_event_count(self):
    592         """Return the number of events that the test page has seen."""
    593         return self._tab.EvaluateJavaScript('eventCount')
    594 
    595 
    596     def get_scroll_delta(self, is_vertical):
    597         """Return the net scrolling the test page has seen.
    598 
    599         @param is_vertical: True for vertical scrolling; False for horizontal.
    600 
    601         """
    602         axis = 'y' if is_vertical else 'x'
    603         return self._tab.EvaluateJavaScript('netScrollDelta.%s' % axis)
    604 
    605 
    606     def get_click_count(self):
    607         """Return the number of clicks the test page has seen."""
    608         return self._tab.EvaluateJavaScript('clickCount')
    609 
    610 
    611     def wait_for_events_to_complete(self, delay_secs=1, timeout=60):
    612         """Wait until test page stops seeing events for delay_secs seconds.
    613 
    614         @param delay_secs: the polling frequency in seconds.
    615         @param timeout: the number of seconds to wait for events to complete.
    616         @raises: error.TestError if no events occurred.
    617         @raises: error.TestError if events did not stop after timeout seconds.
    618 
    619         """
    620         self._tmp_previous_event_count = -1
    621         def _events_stopped_coming():
    622             most_recent_event_count = self.get_event_count()
    623             delta = most_recent_event_count - self._tmp_previous_event_count
    624             self._tmp_previous_event_count = most_recent_event_count
    625             return most_recent_event_count != 0 and delta == 0
    626 
    627         try:
    628             utils.poll_for_condition(
    629                     _events_stopped_coming, exception=error.TestError(),
    630                     sleep_interval=delay_secs, timeout=timeout)
    631         except error.TestError:
    632             if self._tmp_previous_event_count == 0:
    633                 raise error.TestError('No touch event was seen!')
    634             else:
    635                 self.log_events()
    636                 raise error.TestError('Touch events did not stop!')
    637 
    638 
    639     def set_prevent_defaults(self, value):
    640         """Set whether to allow default event actions to go through.
    641 
    642         E.g. if this is True, a two finger horizontal scroll will not actually
    643         produce history navigation on the browser.
    644 
    645         @param value: True for prevent defaults; False to allow them.
    646 
    647         """
    648         js_value = str(value).lower()
    649         self._tab.ExecuteJavaScript('preventDefaults = %s;' % js_value)
    650