Home | History | Annotate | Download | only in audio_CrasStress
      1 # Copyright 2016 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import os
      7 import random
      8 import re
      9 import subprocess
     10 import time
     11 
     12 from autotest_lib.client.bin import test
     13 from autotest_lib.client.common_lib import error
     14 from autotest_lib.client.cros.audio import audio_helper
     15 
     16 _STREAM_TYPE_INPUT = 0
     17 _STREAM_TYPE_OUTPUT = 1
     18 
     19 class audio_CrasStress(test.test):
     20     """Checks if output buffer will drift to super high level."""
     21     version = 2
     22     _MAX_STREAMS = 3
     23     _LOOP_COUNT = 300
     24     _INPUT_BUFFER_LEVEL = '.*?READ_AUDIO.*?hw_level.*?(\d+).*?'
     25     _OUTPUT_BUFFER_LEVEL = '.*?FILL_AUDIO.*?hw_level.*?(\d+).*?'
     26     _CHECK_PERIOD_TIME_SECS = 1 # Check buffer level every second.
     27 
     28     """
     29     We only run 1024 and 512 block size streams in this test. So buffer level
     30     of input device should stay between 0 and 1024. Buffer level of output
     31     device should between 1024 to 2048. Sometimes it will be a little more.
     32     Therefore, we set input buffer criteria to 2 * 1024 and output buffer
     33     criteria to 3 * 1024.
     34     """
     35     _RATES = ['48000', '44100']
     36     _BLOCK_SIZES = ['512', '1024']
     37     _INPUT_BUFFER_DRIFT_CRITERIA = 2 * 1024
     38     _OUTPUT_BUFFER_DRIFT_CRITERIA = 3 * 1024
     39 
     40     def _new_stream(self, stream_type):
     41         """Runs new stream by cras_test_client."""
     42         if stream_type == _STREAM_TYPE_INPUT:
     43             cmd = ['cras_test_client', '--capture_file', '/dev/null']
     44         else:
     45             cmd = ['cras_test_client', '--playback_file', '/dev/zero']
     46 
     47         cmd += ['--rate', self._RATES[random.randint(0, 1)],
     48                 '--block_size', self._BLOCK_SIZES[random.randint(0, 1)]]
     49 
     50         return subprocess.Popen(cmd)
     51 
     52     def _dump_audio(self):
     53         log_file = os.path.join(self.resultsdir, "audio_diagnostics.txt")
     54         with open(log_file, 'w') as f:
     55             f.write(audio_helper.get_audio_diagnostics())
     56 
     57     def _check_buffer_level(self, stream_type):
     58 
     59         buffer_level = self._get_buffer_level(stream_type)
     60 
     61         if stream_type == _STREAM_TYPE_INPUT:
     62             logging.debug("Max input buffer level: %d", buffer_level)
     63             if buffer_level > self._INPUT_BUFFER_DRIFT_CRITERIA:
     64                 self._dump_audio()
     65                 raise error.TestFail('Input buffer level %d drift too high' %
     66                                      buffer_level)
     67 
     68         if stream_type == _STREAM_TYPE_OUTPUT:
     69             logging.debug("Max output buffer level: %d", buffer_level)
     70             if buffer_level > self._OUTPUT_BUFFER_DRIFT_CRITERIA:
     71                 self._dump_audio()
     72                 raise error.TestFail('Output buffer level %d drift too high' %
     73                                      buffer_level)
     74 
     75     def cleanup(self):
     76         """Clean up all streams."""
     77         while len(self._streams) > 0:
     78             self._streams[0].kill()
     79             self._streams.remove(self._streams[0])
     80 
     81     def run_once(self, input_stream=True, output_stream=True):
     82         """
     83         Repeatedly add output streams of random configurations and
     84         remove them to verify if output buffer level would drift.
     85 
     86         @params input_stream: If true, run input stream in the test.
     87         @params output_stream: If true, run output stream in the test.
     88         """
     89 
     90         if not input_stream and not output_stream:
     91             raise error.TestError('Not supported mode.')
     92 
     93         self._streams = []
     94 
     95         loop_count = 0
     96         past_time = time.time()
     97         while loop_count < self._LOOP_COUNT:
     98 
     99             # 1 for adding stream, 0 for removing stream.
    100             add = random.randint(0, 1)
    101             if not self._streams:
    102                 add = 1
    103             elif len(self._streams) == self._MAX_STREAMS:
    104                 add = 0
    105 
    106             if add == 1:
    107                 # 0 for input stream, 1 for output stream.
    108                 stream_type = random.randint(0, 1)
    109                 if not input_stream:
    110                     stream_type = _STREAM_TYPE_OUTPUT
    111                 elif not output_stream:
    112                     stream_type = _STREAM_TYPE_INPUT
    113 
    114                 self._streams.append(self._new_stream(stream_type))
    115             else:
    116                 self._streams[0].kill()
    117                 self._streams.remove(self._streams[0])
    118                 time.sleep(0.1)
    119 
    120             now = time.time()
    121 
    122             # Check buffer level.
    123             if now - past_time > self._CHECK_PERIOD_TIME_SECS:
    124                 past_time = now
    125                 if input_stream:
    126                     self._check_buffer_level(_STREAM_TYPE_INPUT)
    127                 if output_stream:
    128                     self._check_buffer_level(_STREAM_TYPE_OUTPUT)
    129 
    130             loop_count += 1
    131 
    132     def _get_buffer_level(self, stream_type):
    133         """Gets a rough number about current buffer level.
    134 
    135         @returns: The current buffer level.
    136 
    137         """
    138         if stream_type == _STREAM_TYPE_INPUT:
    139             match_str = self._INPUT_BUFFER_LEVEL
    140         else:
    141             match_str = self._OUTPUT_BUFFER_LEVEL
    142 
    143         proc = subprocess.Popen(['cras_test_client', '--dump_a'],
    144                                 stdout=subprocess.PIPE)
    145         output, err = proc.communicate()
    146         buffer_level = 0
    147         for line in output.split('\n'):
    148             search = re.match(match_str, line)
    149             if search:
    150                 tmp = int(search.group(1))
    151                 if tmp > buffer_level:
    152                     buffer_level = tmp
    153         return buffer_level
    154