1 # Copyright 2016 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging 6 import random 7 import re 8 import subprocess 9 import time 10 11 from autotest_lib.client.bin import test 12 from autotest_lib.client.common_lib import error 13 14 15 class audio_CrasOutputStress(test.test): 16 """Checks if output buffer will drift to super high level.""" 17 version = 1 18 _MAX_OUTPUT_STREAMS = 3 19 _LOOP_COUNT = 300 20 _OUTPUT_BUFFER_LEVEL = '.*?SET_DEV_WAKE.*?hw_level.*?(\d+).*?' 21 _BUFFER_DRIFT_CRITERIA = 4096 22 23 def run_once(self): 24 """ 25 Repeatedly add output streams of random configurations and 26 remove them to verify if output buffer level would drift. 27 """ 28 self._output_streams = [] 29 self._rates = ['48000', '44100'] 30 self._block_sizes = ['512', '1024'] 31 32 loop_count = 0 33 while loop_count < self._LOOP_COUNT: 34 if len(self._output_streams) < self._MAX_OUTPUT_STREAMS: 35 cmd = ['cras_test_client', '--playback_file', '/dev/zero', 36 '--rate', self._rates[random.randint(0, 1)], 37 '--block_size', self._block_sizes[random.randint(0, 1)]] 38 proc = subprocess.Popen(cmd) 39 self._output_streams.append(proc) 40 time.sleep(0.01) 41 else: 42 self._output_streams[0].kill() 43 self._output_streams.remove(self._output_streams[0]) 44 time.sleep(0.1) 45 loop_count += 1 46 47 # Get the buffer level. 48 buffer_level = self._get_buffer_level() 49 50 # Clean up all streams. 51 while len(self._output_streams) > 0: 52 self._output_streams[0].kill() 53 self._output_streams.remove(self._output_streams[0]) 54 55 if buffer_level > self._BUFFER_DRIFT_CRITERIA: 56 raise error.TestFail('Buffer level %d drift too high', buffer_level) 57 58 def _get_buffer_level(self): 59 """Gets a rough number about current buffer level. 60 61 @returns: The current buffer level. 62 63 """ 64 proc = subprocess.Popen(['cras_test_client', '--dump_a'], stdout=subprocess.PIPE) 65 output, err = proc.communicate() 66 buffer_level = 0 67 for line in output.split('\n'): 68 search = re.match(self._OUTPUT_BUFFER_LEVEL, line) 69 if search: 70 tmp = int(search.group(1)) 71 if tmp > buffer_level: 72 buffer_level = tmp 73 return buffer_level 74 75