Home | History | Annotate | Download | only in tools
      1 # Copyright 2016 The Android Open Source Project
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #      http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 
     15 import os
     16 import os.path
     17 import re
     18 import subprocess
     19 import sys
     20 import tempfile
     21 import time
     22 
     23 import its.device
     24 import numpy
     25 
     26 SCENE_NAME = 'sensor_fusion'
     27 SKIP_RET_CODE = 101
     28 TEST_NAME = 'test_sensor_fusion'
     29 TEST_DIR = os.path.join(os.getcwd(), 'tests', SCENE_NAME)
     30 W, H = 640, 480
     31 
     32 # For finding best correlation shifts from test output logs.
     33 SHIFT_RE = re.compile('^Best correlation of [0-9.]+ at shift of [-0-9.]+ms$')
     34 # For finding lines that indicate socket issues in failed test runs.
     35 SOCKET_FAIL_RE = re.compile(
     36         r'.*((socket\.(error|timeout))|(Problem with socket)).*')
     37 
     38 FPS = 30
     39 TEST_LENGTH = 7  # seconds
     40 
     41 
     42 def main():
     43     """Run test_sensor_fusion NUM_RUNS times.
     44 
     45     Save intermediate files and produce a summary/report of the results.
     46 
     47     Script should be run from the top-level CameraITS directory.
     48 
     49     Command line arguments:
     50         camera:      Camera(s) to be tested. Use comma to separate multiple
     51                      camera Ids. Ex: 'camera=0,1' or 'camera=1'
     52         device:      Device id for adb
     53         fps:         FPS to capture with during the test
     54         img_size:    Comma-separated dimensions of captured images (defaults to
     55                      640x480). Ex: 'img_size=<width>,<height>'
     56         num_runs:    Number of times to repeat the test
     57         rotator:     String for rotator id in for vid:pid:ch
     58         test_length: How long the test should run for (in seconds)
     59         tmp_dir:     Location of temp directory for output files
     60     """
     61 
     62     camera_id = '0'
     63     fps = str(FPS)
     64     img_size = '%s,%s' % (W, H)
     65     num_runs = 1
     66     rotator_ids = 'default'
     67     test_length = str(TEST_LENGTH)
     68     tmp_dir = None
     69     for s in sys.argv[1:]:
     70         if s[:7] == 'camera=' and len(s) > 7:
     71             camera_id = s[7:]
     72         if s[:4] == 'fps=' and len(s) > 4:
     73             fps = s[4:]
     74         elif s[:9] == 'img_size=' and len(s) > 9:
     75             img_size = s[9:]
     76         elif s[:9] == 'num_runs=' and len(s) > 9:
     77             num_runs = int(s[9:])
     78         elif s[:8] == 'rotator=' and len(s) > 8:
     79             rotator_ids = s[8:]
     80         elif s[:12] == 'test_length=' and len(s) > 12:
     81             test_length = s[12:]
     82         elif s[:8] == 'tmp_dir=' and len(s) > 8:
     83             tmp_dir = s[8:]
     84 
     85     # Make output directories to hold the generated files.
     86     tmpdir = tempfile.mkdtemp(dir=tmp_dir)
     87     print 'Saving output files to:', tmpdir, '\n'
     88 
     89     device_id = its.device.get_device_id()
     90     device_id_arg = 'device=' + device_id
     91     print 'Testing device ' + device_id
     92 
     93     # ensure camera_id is valid
     94     avail_camera_ids = find_avail_camera_ids()
     95     if camera_id not in avail_camera_ids:
     96         print 'Need to specify valid camera_id in ', avail_camera_ids
     97         sys.exit()
     98 
     99     camera_id_arg = 'camera=' + camera_id
    100     if rotator_ids:
    101         rotator_id_arg = 'rotator=' + rotator_ids
    102     print 'Preparing to run sensor_fusion on camera', camera_id
    103 
    104     img_size_arg = 'img_size=' + img_size
    105     print 'Image dimensions are ' + 'x'.join(img_size.split(','))
    106 
    107     fps_arg = 'fps=' + fps
    108     test_length_arg = 'test_length=' + test_length
    109     print 'Capturing at %sfps' % fps
    110 
    111     os.mkdir(os.path.join(tmpdir, camera_id))
    112 
    113     # Run test "num_runs" times, capturing stdout and stderr.
    114     num_pass = 0
    115     num_fail = 0
    116     num_skip = 0
    117     num_socket_fails = 0
    118     num_non_socket_fails = 0
    119     shift_list = []
    120     for i in range(num_runs):
    121         os.mkdir(os.path.join(tmpdir, camera_id, SCENE_NAME+'_'+str(i)))
    122         cmd = 'python tools/rotation_rig.py rotator=%s' % rotator_ids
    123         subprocess.Popen(cmd.split())
    124         cmd = ['python', os.path.join(TEST_DIR, TEST_NAME+'.py'),
    125                device_id_arg, camera_id_arg, rotator_id_arg, img_size_arg,
    126                fps_arg, test_length_arg]
    127         outdir = os.path.join(tmpdir, camera_id, SCENE_NAME+'_'+str(i))
    128         outpath = os.path.join(outdir, TEST_NAME+'_stdout.txt')
    129         errpath = os.path.join(outdir, TEST_NAME+'_stderr.txt')
    130         t0 = time.time()
    131         with open(outpath, 'w') as fout, open(errpath, 'w') as ferr:
    132             retcode = subprocess.call(
    133                     cmd, stderr=ferr, stdout=fout, cwd=outdir)
    134         t1 = time.time()
    135 
    136         if retcode == 0:
    137             retstr = 'PASS '
    138             time_shift = find_time_shift(outpath)
    139             shift_list.append(time_shift)
    140             num_pass += 1
    141         elif retcode == SKIP_RET_CODE:
    142             retstr = 'SKIP '
    143             num_skip += 1
    144         else:
    145             retstr = 'FAIL '
    146             time_shift = find_time_shift(outpath)
    147             if time_shift is None:
    148                 if is_socket_fail(errpath):
    149                     num_socket_fails += 1
    150                 else:
    151                     num_non_socket_fails += 1
    152             else:
    153                 shift_list.append(time_shift)
    154             num_fail += 1
    155         msg = '%s %s/%s [%.1fs]' % (retstr, SCENE_NAME, TEST_NAME, t1-t0)
    156         print msg
    157 
    158     if num_pass == 1:
    159         print 'Best shift is %sms' % shift_list[0]
    160     elif num_pass > 1:
    161         shift_arr = numpy.array(shift_list)
    162         mean, std = numpy.mean(shift_arr), numpy.std(shift_arr)
    163         print 'Best shift mean is %sms with std. dev. of %sms' % (mean, std)
    164 
    165     pass_percentage = 100*float(num_pass+num_skip)/num_runs
    166     print '%d / %d tests passed (%.1f%%)' % (num_pass+num_skip,
    167                                              num_runs,
    168                                              pass_percentage)
    169 
    170     if num_socket_fails != 0:
    171         print '%s failure(s) due to socket issues' % num_socket_fails
    172     if num_non_socket_fails != 0:
    173         print '%s non-socket failure(s)' % num_non_socket_fails
    174 
    175 
    176 def is_socket_fail(err_file_path):
    177     """Search through a test run's stderr log for any mention of socket issues.
    178 
    179     Args:
    180         err_file_path: File path for stderr logs to search through
    181 
    182     Returns:
    183         True if the test run failed and it was due to socket issues. Otherwise,
    184         False.
    185     """
    186     return find_matching_line(err_file_path, SOCKET_FAIL_RE) is not None
    187 
    188 
    189 def find_time_shift(out_file_path):
    190     """Search through a test run's stdout log for the best time shift.
    191 
    192     Args:
    193         out_file_path: File path for stdout logs to search through
    194 
    195     Returns:
    196         The best time shift, if one is found. Otherwise, returns None.
    197     """
    198     line = find_matching_line(out_file_path, SHIFT_RE)
    199     if line is None:
    200         return None
    201     else:
    202         words = line.split(' ')
    203         # Get last word and strip off 'ms\n' before converting to a float.
    204         return float(words[-1][:-3])
    205 
    206 
    207 def find_matching_line(file_path, regex):
    208     """Search each line in the file at 'file_path' for a line matching 'regex'.
    209 
    210     Args:
    211         file_path: File path for file being searched
    212         regex:     Regex used to match against lines
    213 
    214     Returns:
    215         The first matching line. If none exists, returns None.
    216     """
    217     with open(file_path) as f:
    218         for line in f:
    219             if regex.match(line):
    220                 return line
    221     return None
    222 
    223 def find_avail_camera_ids():
    224     """Find the available camera IDs.
    225 
    226     Returns:
    227         list of available cameras
    228     """
    229     with its.device.ItsSession() as cam:
    230         avail_camera_ids = cam.get_camera_ids()
    231     return avail_camera_ids
    232 
    233 
    234 if __name__ == '__main__':
    235     main()
    236 
    237