Home | History | Annotate | Download | only in enterprise_CFM_PTZStress
      1 # Copyright 2017 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import time
      7 import re
      8 
      9 from autotest_lib.client.common_lib import error
     10 from autotest_lib.client.common_lib.cros.cfm.usb import cfm_usb_devices
     11 from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_collector
     12 from autotest_lib.server.cros.cfm import cfm_base_test
     13 
     14 class enterprise_CFM_PTZStress(cfm_base_test.CfmBaseTest):
     15     """
     16     Executes the following tests on CFM devices:
     17 
     18        1. Enroll the device and join a meeting.
     19        2. During meeting PTZ the camera according to the control file.
     20     Verify the following functionalities:
     21 
     22        1. Camera is enumerated.
     23        2. Verify PTZ signal is sent to the camera.
     24     """
     25     version = 1
     26 
     27     def check_camera_enumeration(self, camera_name):
     28         """
     29         Checks if there is any camera connected to the DUT.
     30             If so, return the USB bus number the camera is on.
     31         @param camera_name the camera's name under test
     32         @returns The USB bus number the camera is on, if there is only
     33             1 camera connected, false otherwise.
     34         """
     35         collector = usb_device_collector.UsbDeviceCollector(self._host)
     36         camera = collector.get_devices_by_spec(camera_name)
     37         if len(camera) == 1:
     38             bus_number = camera[0].bus
     39             logging.info('Camera enumerated: {} on bus {}'.
     40                 format(camera_name,bus_number))
     41             return bus_number
     42         raise error.TestFail('Camera failed to enumerate')
     43 
     44 
     45     def dump_usbmon_traffic(self, bus, usb_trace_path):
     46         """
     47         Start usbmon with specified bus and dump the traffic to file
     48         @param bus bus number the camera is on
     49         @param usb_trace_path the USB traces file path
     50         """
     51         cmd = ('cat /sys/kernel/debug/usb/usbmon/{}u > {} &'.
     52             format(bus, usb_trace_path))
     53         try:
     54             self._host.run(cmd, ignore_status = True)
     55         except Exception as e:
     56             logging.info('Fail to run cmd {}. Error: {}'.
     57                 format(cmd, str(e)))
     58         logging.info('Usbmon traffic dumped to {}'.format(usb_trace_path))
     59 
     60 
     61     def check_usbmon_traffic(self, usb_trace_path):
     62         """
     63         Check traces
     64         @param usb_trace_path the USB traces file path
     65         """
     66         cmd = ('cat {} & '.format(usb_trace_path))
     67         try:
     68             traces = self._host.run_output(cmd, ignore_status = True)
     69             if re.search('C Ii', traces) and re.search('S Ii', traces):
     70                 logging.info('PTZ signal verified')
     71             else:
     72                 raise error.TestFail('PTZ signal did not go through')
     73         except Exception as e:
     74             logging.info('Fail to run cmd {}. Error: {}'.format(cmd, str(e)))
     75 
     76 
     77     def clean_usb_traces_file(self, usb_trace_path):
     78         """
     79         Clean traces file
     80         @param usb_trace_path the USB traces file path
     81         """
     82         cmd = ('rm {}'.format(usb_trace_path))
     83         try:
     84             self._host.run(cmd, ignore_status = True)
     85         except Exception as e:
     86             raise error.TestFail('Fail to run cmd {}. Error: {}'.format(cmd, str(e)))
     87         logging.info('Cleaned up traces in {}'.format(usb_trace_path))
     88 
     89 
     90     def run_once(self, host, test_config, ptz_motion_sequence):
     91         """Runs the test."""
     92         self.cfm_facade.wait_for_meetings_telemetry_commands()
     93         for loop_no in xrange(1, test_config['repeat'] + 1):
     94             logging.info('Test Loop : {}'.format(loop_no))
     95             bus = self.check_camera_enumeration(test_config['camera'])
     96             self.cfm_facade.start_meeting_session()
     97             self.dump_usbmon_traffic(bus, test_config['usb_trace_path'])
     98             for motion in ptz_motion_sequence:
     99                 self.cfm_facade.move_camera(motion)
    100                 time.sleep(test_config['motion_duration'])
    101             self.check_usbmon_traffic(test_config['usb_trace_path'])
    102             self.cfm_facade.end_meeting_session()
    103             self.clean_usb_traces_file(test_config['usb_trace_path'])
    104