Home | History | Annotate | Download | only in desktopui_SonicExtension
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 
      6 import json
      7 import os
      8 import time
      9 
     10 import selenium
     11 
     12 from autotest_lib.client.bin import utils
     13 from extension_pages import e2e_test_utils
     14 from extension_pages import options
     15 
     16 
     17 class TestUtils(object):
     18     """Contains all the helper functions for Chrome mirroring automation."""
     19 
     20     short_wait_secs = 3
     21     step_timeout_secs = 60
     22     cpu_fields = ['user', 'nice', 'system', 'idle', 'iowait', 'irq',
     23                   'softirq', 'steal', 'guest', 'guest_nice']
     24     cpu_idle_fields = ['idle', 'iowait']
     25 
     26     def __init__(self):
     27         """Constructor"""
     28 
     29 
     30     def set_mirroring_options(self, driver, extension_id, settings):
     31         """Apply all the settings given by the user to the option page.
     32 
     33         @param driver: The chromedriver instance of the test
     34         @param extension_id: The id of the Cast extension
     35         @param settings: The settings and information about the test
     36         """
     37         options_page = options.OptionsPage(driver, extension_id)
     38         options_page.open_hidden_options_menu()
     39         time.sleep(self.short_wait_secs)
     40         for key in settings.keys():
     41             options_page.set_value(key, settings[key])
     42 
     43 
     44     def start_v2_mirroring_test_utils(
     45             self, driver, extension_id, receiver_ip, url, fullscreen,
     46             udp_proxy_server=None, network_profile=None):
     47         """Use test util page to start mirroring session on specific device.
     48 
     49         @param driver: The chromedriver instance
     50         @param extension_id: The id of the Cast extension
     51         @param receiver_ip: The ip of the Eureka dongle to launch the activity
     52         @param url: The URL to navigate to
     53         @param fullscreen: click the fullscreen button or not
     54         @param udp_proxy_server: the address of udp proxy server,
     55             it should be a http address, http://<ip>:<port>
     56         @param network_profile: the network profile,
     57             it should be one of wifi, bad and evil.
     58         @return True if the function finishes
     59         """
     60         e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
     61                 driver, extension_id)
     62         time.sleep(self.short_wait_secs)
     63         tab_handles = driver.window_handles
     64         e2e_test_utils_page.receiver_ip_or_name_v2_text_box().set_value(
     65                 receiver_ip)
     66         e2e_test_utils_page.url_to_open_v2_text_box().set_value(url)
     67         if udp_proxy_server:
     68             e2e_test_utils_page.udp_proxy_server_text_box().set_value(
     69                     udp_proxy_server)
     70         if network_profile:
     71             e2e_test_utils_page.network_profile_text_box().set_value(
     72                     network_profile)
     73         time.sleep(self.short_wait_secs)
     74         e2e_test_utils_page.open_then_mirror_v2_button().click()
     75         time.sleep(self.short_wait_secs)
     76         all_handles = driver.window_handles
     77         video_handle = [x for x in all_handles if x not in tab_handles].pop()
     78         driver.switch_to_window(video_handle)
     79         self.navigate_to_test_url(driver, url, fullscreen)
     80         return True
     81 
     82 
     83     def stop_v2_mirroring_test_utils(self, driver, extension_id):
     84       """Use test util page to stop a mirroring session on a specific device.
     85 
     86       @param driver: The chromedriver instance
     87       @param extension_id: The id of the Cast extension
     88       @param activity_id: The id of the mirroring activity
     89       """
     90       e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(driver,
     91                                                             extension_id)
     92       e2e_test_utils_page.go_to_page()
     93       time.sleep(self.short_wait_secs)
     94       e2e_test_utils_page.stop_v2_mirroring_button().click()
     95 
     96 
     97     def start_v2_mirroring_sdk(self, driver, device_ip, url, extension_id):
     98         """Use SDK to start a mirroring session on a specific device.
     99 
    100         @param driver: The chromedriver instance
    101         @param device_ip: The IP of the Eureka device
    102         @param url: The URL to navigate to
    103         @param extension_id: The id of the Cast extension
    104         @return True if the function finishes
    105         @raise RuntimeError for timeouts
    106         """
    107         self.set_auto_testing_ip(driver, extension_id, device_ip)
    108         self.nagviate(driver, url, False)
    109         time.sleep(self.short_wait_secs)
    110         driver.execute_script('loadScript()')
    111         self._wait_for_result(
    112                 lambda: driver.execute_script('return isSuccessful'),
    113                 'Timeout when initiating mirroring... ...')
    114         driver.execute_script('startV2Mirroring()')
    115         self._wait_for_result(
    116                 lambda: driver.execute_script('return isSuccessful'),
    117                 'Timeout when triggering mirroring... ...')
    118         return True
    119 
    120 
    121     def stop_v2_mirroring_sdk(self, driver, activity_id=None):
    122         """Use SDK to stop the mirroring activity in Chrome.
    123 
    124         @param driver: The chromedriver instance
    125         @param activity_id: The id of the mirroring activity
    126         @raise RuntimeError for timeouts
    127         """
    128         driver.execute_script('stopV2Mirroring()')
    129         self._wait_for_result(
    130                 lambda: driver.execute_script('return isSuccessful'),
    131                 self.step_timeout_secs)
    132 
    133 
    134     def set_auto_testing_ip(self, driver, extension_id, device_ip):
    135         """Set the auto testing IP on the extension page.
    136 
    137         @param driver: The chromedriver instance
    138         @param extension_id: The id of the Cast extension
    139         @param device_ip: The IP of the device to test against
    140         """
    141         e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
    142                 driver, extension_id)
    143         e2e_test_utils_page.execute_script(
    144                 'localStorage["AutoTestingIp"] = "%s";' % device_ip)
    145 
    146 
    147     def upload_v2_mirroring_logs(self, driver, extension_id):
    148         """Upload v2 mirroring logs for the latest mirroring session.
    149 
    150         @param driver: The chromedriver instance of the browser
    151         @param extension_id: The extension ID of the Cast extension
    152         @return The report id in crash staging server.
    153         @raises RuntimeError if an error occurred during uploading
    154         """
    155         e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
    156                 driver, extension_id)
    157         e2e_test_utils_page.go_to_page()
    158         time.sleep(self.short_wait_secs)
    159         e2e_test_utils_page.upload_v2_mirroring_logs_button().click()
    160         report_id = self._wait_for_result(
    161             e2e_test_utils_page.v2_mirroring_logs_scroll_box().get_value,
    162             'Failed to get v2 mirroring logs')
    163         if 'Failed to upload logs' in report_id:
    164           raise RuntimeError('Failed to get v2 mirroring logs')
    165         return report_id
    166 
    167 
    168     def get_chrome_version(self, driver):
    169         """Return the Chrome version that is being used for running test.
    170 
    171         @param driver: The chromedriver instance
    172         @return The Chrome version
    173         """
    174         get_chrome_version_js = 'return window.navigator.appVersion;'
    175         app_version = driver.execute_script(get_chrome_version_js)
    176         for item in app_version.split():
    177            if 'Chrome/' in item:
    178               return item.split('/')[1]
    179         return None
    180 
    181 
    182     def get_chrome_revision(self, driver):
    183         """Return Chrome revision number that is being used for running test.
    184 
    185         @param driver: The chromedriver instance
    186         @return The Chrome revision number
    187         """
    188         get_chrome_revision_js = ('return document.getElementById("version").'
    189                                   'getElementsByTagName("span")[2].innerHTML;')
    190         driver.get('chrome://version')
    191         return driver.execute_script(get_chrome_revision_js)
    192 
    193 
    194     def get_extension_id_from_flag(self, extra_flags):
    195         """Gets the extension ID based on the whitelisted extension id flag.
    196 
    197         @param extra_flags: A string which contains all the extra chrome flags
    198         @return The ID of the extension. Return None if nothing is found.
    199         """
    200         extra_flags_list = extra_flags.split()
    201         for flag in extra_flags_list:
    202             if 'whitelisted-extension-id=' in flag:
    203                 return flag.split('=')[1]
    204         return None
    205 
    206 
    207     def navigate_to_test_url(self, driver, url, fullscreen):
    208         """Navigate to a given URL. Click fullscreen button if needed.
    209 
    210         @param driver: The chromedriver instance
    211         @param url: The URL of the site to navigate to
    212         @param fullscreen: True and the video will play in full screen mode.
    213                            Otherwise, set to False
    214         """
    215         driver.get(url)
    216         driver.refresh()
    217         if fullscreen:
    218           self.request_full_screen(driver)
    219 
    220 
    221     def request_full_screen(self, driver):
    222         """Request full screen.
    223 
    224         @param driver: The chromedriver instance
    225         """
    226         try:
    227             time.sleep(self.short_wait_secs)
    228             driver.find_element_by_id('fsbutton').click()
    229         except selenium.common.exceptions.NoSuchElementException as error_message:
    230             print 'Full screen button is not found. ' + str(error_message)
    231 
    232 
    233     def set_focus_tab(self, driver, tab_handle):
    234       """Set the focus on a tab.
    235 
    236       @param driver: The chromedriver instance
    237       @param tab_handle: The chrome driver handle of the tab
    238       """
    239       driver.switch_to_window(tab_handle)
    240       driver.get_screenshot_as_base64()
    241 
    242 
    243     def block_setup_dialog(self, driver, extension_id):
    244         """Tab cast through the extension.
    245 
    246         @param driver: A chromedriver instance that has the extension loaded.
    247         @param extension_id: Id of the extension to use.
    248         """
    249         e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(
    250                 driver, extension_id)
    251         e2e_test_utils_page.go_to_page()
    252         time.sleep(self.short_wait_secs)
    253         driver.execute_script(
    254             'localStorage["blockChromekeySetupAutoLaunchOnInstall"] = "true"')
    255 
    256 
    257     def close_popup_tabs(self, driver):
    258         """Close any popup windows the extension might open by default.
    259 
    260         Since we're going to handle the extension ourselves all we need is
    261         the main browser window with a single tab. The safest way to handle
    262         the popup however, is to close the currently active tab, so we don't
    263         mess with chromedrivers ui debugger.
    264 
    265         @param driver: Chromedriver instance.
    266         @raises Exception If you close the tab associated with
    267             the ui debugger.
    268         """
    269         # TODO: There are several, albeit hacky ways, to handle this popup
    270         # that might need to change with different versions of the extension
    271         # until the core issue is resolved. See crbug.com/338399.
    272         current_tab_handle = driver.current_window_handle
    273         for handle in driver.window_handles:
    274             if current_tab_handle != handle:
    275                 try:
    276                     time.sleep(self.short_wait_secs)
    277                     driver.switch_to_window(handle)
    278                     driver.close()
    279                 except:
    280                     pass
    281         driver.switch_to_window(current_tab_handle)
    282 
    283 
    284     def output_dict_to_file(self, dictionary, file_name,
    285                             path=None, sort_keys=False):
    286         """Output a dictionary into a file.
    287 
    288         @param dictionary: The dictionary to be output as JSON
    289         @param file_name: The name of the file that is being output
    290         @param path: The path of the file. The default is None
    291         @param sort_keys: Sort dictionary by keys when output. False by default
    292         """
    293         if path is None:
    294             path = os.path.abspath(os.path.dirname(__file__))
    295         # if json file exists, read the existing one and append to it
    296         json_file = os.path.join(path, file_name)
    297         if os.path.isfile(json_file) and dictionary:
    298             with open(json_file, 'r') as existing_json_data:
    299                 json_data = json.load(existing_json_data)
    300             dictionary = dict(json_data.items() + dictionary.items())
    301         output_json = json.dumps(dictionary, sort_keys=sort_keys)
    302         with open(json_file, 'w') as file_handler:
    303             file_handler.write(output_json)
    304 
    305 
    306     def compute_cpu_utilization(self, cpu_dict):
    307         """Generate the upper/lower bound and the average CPU consumption.
    308 
    309         @param cpu_dict: The dictionary that contains CPU usage every sec.
    310         @returns A dict that contains upper/lower bound and average cpu usage.
    311         """
    312         cpu_bound = {}
    313         cpu_usage = sorted(cpu_dict.values())
    314         cpu_bound['lower_bound'] = (
    315                 '%.2f' % cpu_usage[int(len(cpu_usage) * 10.0 / 100.0)])
    316         cpu_bound['upper_bound'] = (
    317                 '%.2f' % cpu_usage[int(len(cpu_usage) * 90.0 / 100.0)])
    318         cpu_bound['average'] = '%.2f' % (sum(cpu_usage) / float(len(cpu_usage)))
    319         return cpu_bound
    320 
    321 
    322     def cpu_usage_interval(self, duration, interval=1):
    323         """Get the CPU usage over a period of time based on interval.
    324 
    325         @param duration: The duration of getting the CPU usage.
    326         @param interval: The interval to check the CPU usage. Default is 1 sec.
    327         @return A dict that contains CPU usage over different time intervals.
    328         """
    329         current_time = 0
    330         cpu_usage = {}
    331         while current_time < duration:
    332             pre_times = self._get_system_times()
    333             time.sleep(interval)
    334             post_times = self._get_system_times()
    335             cpu_usage[current_time] = self._get_avg_cpu_usage(
    336                     pre_times, post_times)
    337             current_time += interval
    338         return cpu_usage
    339 
    340 
    341     def _get_avg_cpu_usage(self, pre_times, post_times):
    342         """Calculate the average CPU usage of two different periods of time.
    343 
    344         @param pre_times: The CPU usage information of the start point.
    345         @param post_times: The CPU usage information of the end point.
    346         @return Average CPU usage over a time period.
    347         """
    348         diff_times = {}
    349         for field in self.cpu_fields:
    350             diff_times[field] = post_times[field] - pre_times[field]
    351 
    352         idle_time = sum(diff_times[field] for field in self.cpu_idle_fields)
    353         total_time = sum(diff_times[field] for field in self.cpu_fields)
    354         return float(total_time - idle_time) / total_time * 100.0
    355 
    356 
    357     def _get_system_times(self):
    358         """Get the CPU information from the system times.
    359 
    360         @return An list with CPU usage of different processes.
    361         """
    362         proc_stat = utils.read_file('/proc/stat')
    363         for line in proc_stat.split('\n'):
    364             if line.startswith('cpu '):
    365                 times = line[4:].strip().split(' ')
    366                 times = [int(jiffies) for jiffies in times]
    367                 return dict(zip(self.cpu_fields, times))
    368 
    369 
    370     def _wait_for_result(self, get_result, error_message):
    371         """Wait for the result.
    372 
    373         @param get_result: the function to get result.
    374         @param error_message: the error message in the exception
    375             if it is failed to get result.
    376         @return The result.
    377         @raises RuntimeError if it is failed to get result within
    378             self.step_timeout_secs.
    379         """
    380         start = time.time()
    381         while (((time.time() - start) < self.step_timeout_secs)
    382                and not get_result()):
    383             time.sleep(self.step_timeout_secs/10.0)
    384         if not get_result():
    385             raise RuntimeError(error_message)
    386         return get_result()
    387