1 # Copyright 2017 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging 6 import time 7 import re 8 9 from autotest_lib.client.common_lib import error 10 from autotest_lib.client.common_lib.cros.cfm.usb import cfm_usb_devices 11 from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_collector 12 from autotest_lib.server.cros.cfm import cfm_base_test 13 14 class enterprise_CFM_PTZStress(cfm_base_test.CfmBaseTest): 15 """ 16 Executes the following tests on CFM devices: 17 18 1. Enroll the device and join a meeting. 19 2. During meeting PTZ the camera according to the control file. 20 Verify the following functionalities: 21 22 1. Camera is enumerated. 23 2. Verify PTZ signal is sent to the camera. 24 """ 25 version = 1 26 27 def check_camera_enumeration(self, camera_name): 28 """ 29 Checks if there is any camera connected to the DUT. 30 If so, return the USB bus number the camera is on. 31 @param camera_name the camera's name under test 32 @returns The USB bus number the camera is on, if there is only 33 1 camera connected, false otherwise. 34 """ 35 collector = usb_device_collector.UsbDeviceCollector(self._host) 36 camera = collector.get_devices_by_spec(camera_name) 37 if len(camera) == 1: 38 bus_number = camera[0].bus 39 logging.info('Camera enumerated: {} on bus {}'. 40 format(camera_name,bus_number)) 41 return bus_number 42 raise error.TestFail('Camera failed to enumerate') 43 44 45 def dump_usbmon_traffic(self, bus, usb_trace_path): 46 """ 47 Start usbmon with specified bus and dump the traffic to file 48 @param bus bus number the camera is on 49 @param usb_trace_path the USB traces file path 50 """ 51 cmd = ('cat /sys/kernel/debug/usb/usbmon/{}u > {} &'. 52 format(bus, usb_trace_path)) 53 try: 54 self._host.run(cmd, ignore_status = True) 55 except Exception as e: 56 logging.info('Fail to run cmd {}. Error: {}'. 57 format(cmd, str(e))) 58 logging.info('Usbmon traffic dumped to {}'.format(usb_trace_path)) 59 60 61 def check_usbmon_traffic(self, usb_trace_path): 62 """ 63 Check traces 64 @param usb_trace_path the USB traces file path 65 """ 66 cmd = ('cat {} & '.format(usb_trace_path)) 67 try: 68 traces = self._host.run_output(cmd, ignore_status = True) 69 if re.search('C Ii', traces) and re.search('S Ii', traces): 70 logging.info('PTZ signal verified') 71 else: 72 raise error.TestFail('PTZ signal did not go through') 73 except Exception as e: 74 logging.info('Fail to run cmd {}. Error: {}'.format(cmd, str(e))) 75 76 77 def clean_usb_traces_file(self, usb_trace_path): 78 """ 79 Clean traces file 80 @param usb_trace_path the USB traces file path 81 """ 82 cmd = ('rm {}'.format(usb_trace_path)) 83 try: 84 self._host.run(cmd, ignore_status = True) 85 except Exception as e: 86 raise error.TestFail('Fail to run cmd {}. Error: {}'.format(cmd, str(e))) 87 logging.info('Cleaned up traces in {}'.format(usb_trace_path)) 88 89 90 def run_once(self, host, test_config, ptz_motion_sequence): 91 """Runs the test.""" 92 self.cfm_facade.wait_for_meetings_telemetry_commands() 93 for loop_no in xrange(1, test_config['repeat'] + 1): 94 logging.info('Test Loop : {}'.format(loop_no)) 95 bus = self.check_camera_enumeration(test_config['camera']) 96 self.cfm_facade.start_meeting_session() 97 self.dump_usbmon_traffic(bus, test_config['usb_trace_path']) 98 for motion in ptz_motion_sequence: 99 self.cfm_facade.move_camera(motion) 100 time.sleep(test_config['motion_duration']) 101 self.check_usbmon_traffic(test_config['usb_trace_path']) 102 self.cfm_facade.end_meeting_session() 103 self.clean_usb_traces_file(test_config['usb_trace_path']) 104