Home | History | Annotate | Download | only in scripts
      1 #!/usr/bin/python
      2 
      3 # Copyright (C) 2014 The Android Open Source Project
      4 #
      5 # Licensed under the Apache License, Version 2.0 (the "License");
      6 # you may not use this file except in compliance with the License.
      7 # You may obtain a copy of the License at
      8 #
      9 #       http://www.apache.org/licenses/LICENSE-2.0
     10 #
     11 # Unless required by applicable law or agreed to in writing, software
     12 # distributed under the License is distributed on an "AS IS" BASIS,
     13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14 # See the License for the specific language governing permissions and
     15 # limitations under the License.
     16 
     17 import logging
     18 import os.path
     19 import select
     20 import sys
     21 import time
     22 import collections
     23 import socket
     24 import gflags as flags  # http://code.google.com/p/python-gflags/
     25 import pkgutil
     26 import threading
     27 import Queue
     28 import traceback
     29 import math
     30 import bisect
     31 from bisect import bisect_left
     32 
     33 """
     34 scipy, numpy and matplotlib are python packages that can be installed
     35 from: http://www.scipy.org/
     36 
     37 """
     38 import scipy
     39 import matplotlib.pyplot as plt
     40 
     41 # let this script know about the power monitor implementations
     42 sys.path = [os.path.basename(__file__)] + sys.path
     43 available_monitors = [
     44     name
     45     for _, name, _ in pkgutil.iter_modules(
     46         [os.path.join(os.path.dirname(__file__), "power_monitors")])
     47     if not name.startswith("_")]
     48 
     49 APK = os.path.join(os.path.dirname(__file__), "..", "CtsVerifier.apk")
     50 
     51 FLAGS = flags.FLAGS
     52 
     53 # DELAY_SCREEN_OFF is the number of seconds to wait for baseline state
     54 DELAY_SCREEN_OFF = 20.0
     55 
     56 # whether to log data collected to a file for each sensor run:
     57 LOG_DATA_TO_FILE = True
     58 
     59 logging.getLogger().setLevel(logging.ERROR)
     60 
     61 
     62 def do_import(name):
     63     """import a module by name dynamically"""
     64     mod = __import__(name)
     65     components = name.split(".")
     66     for comp in components[1:]:
     67         mod = getattr(mod, comp)
     68     return mod
     69 
     70 class PowerTestException(Exception):
     71     """
     72     Definition of specialized Exception class for CTS power tests
     73     """
     74     def __init__(self, message):
     75         self._error_message = message
     76     def __str__(self):
     77         return self._error_message
     78 
     79 class PowerTest:
     80     """Class to run a suite of power tests. This has methods for obtaining
     81     measurements from the power monitor (through the driver) and then
     82     processing it to determine baseline and AP suspend state and
     83     measure ampere draw of various sensors.
     84     Ctrl+C causes a keyboard interrupt exception which terminates the test."""
     85 
     86     # Thresholds for max allowed power usage per sensor tested
     87     # TODO: Accel, Mag and Gyro have no maximum power specified in the CDD;
     88     # the following numbers are bogus and will be replaced soon by what
     89     # the device reports (from Sensor.getPower())
     90     MAX_ACCEL_AMPS = 0.08  # Amps
     91     MAX_MAG_AMPS = 0.08  # Amps
     92     MAX_GYRO_AMPS = 0.08  # Amps
     93     MAX_SIGMO_AMPS = 0.08  # Amps
     94 
     95     # TODO: The following numbers for step counter, etc must be replaced by
     96     # the numbers specified in CDD for low-power sensors. The expected current
     97     # draw must be computed from the specified power and the voltage used to
     98     # power the device (specified from a config file).
     99     MAX_STEP_COUNTER_AMPS = 0.08  # Amps
    100     MAX_STEP_DETECTOR_AMPS = 0.08  # Amps
    101     # The variable EXPECTED_AMPS_VARIATION_HALF_RANGE denotes the expected
    102     # variation of  the ampere measurements
    103     # around the mean value at baseline state. i.e. we expect most of the
    104     # ampere measurements at baseline state to vary around the mean by
    105     # between +/- of the number below
    106     EXPECTED_AMPS_VARIATION_HALF_RANGE = 0.0005
    107     # The variable THRESHOLD_BASELINE_SAMPLES_FRACTION denotes the minimum fraction of samples that must
    108     # be in the range of variation defined by EXPECTED_AMPS_VARIATION_HALF_RANGE
    109     # around the mean baseline for us to decide that the phone has settled into
    110     # its baseline state
    111     THRESHOLD_BASELINE_SAMPLES_FRACTION = 0.86
    112     # The variable MAX_PERCENTILE_AP_SCREEN_OFF_AMPS denotes the maximum ampere
    113     # draw that the device can consume when it has gone to suspend state with
    114     # one or more sensors registered and batching samples (screen and AP are
    115     # off in this case)
    116     MAX_PERCENTILE_AP_SCREEN_OFF_AMPS = 0.030  # Amps
    117     # The variable PERCENTILE_MAX_AP_SCREEN_OFF denotes the fraction of ampere
    118     # measurements that must be below the specified maximum amperes
    119     # MAX_PERCENTILE_AP_SCREEN_OFF_AMPS for us to decide that the phone has
    120     # reached suspend state.
    121     PERCENTILE_MAX_AP_SCREEN_OFF = 0.95
    122     DOMAIN_NAME = "/android/cts/powertest"
    123     # SAMPLE_COUNT_NOMINAL denotes the typical number of measurements of amperes
    124     # to collect from the power monitor
    125     SAMPLE_COUNT_NOMINAL = 1000
    126     # RATE_NOMINAL denotes the nominal frequency at which ampere measurements
    127     # are taken from the monsoon power monitor
    128     RATE_NOMINAL = 100
    129     ENABLE_PLOTTING = False
    130 
    131     REQUEST_EXTERNAL_STORAGE = "EXTERNAL STORAGE?"
    132     REQUEST_EXIT = "EXIT"
    133     REQUEST_RAISE = "RAISE %s %s"
    134     REQUEST_USER_RESPONSE = "USER RESPONSE %s"
    135     REQUEST_SET_TEST_RESULT = "SET TEST RESULT %s %s %s"
    136     REQUEST_SENSOR_SWITCH = "SENSOR %s %s"
    137     REQUEST_SENSOR_AVAILABILITY = "SENSOR? %s"
    138     REQUEST_SCREEN_OFF = "SCREEN OFF"
    139     REQUEST_SHOW_MESSAGE = "MESSAGE %s"
    140 
    141     NEGATIVE_AMPERE_ERROR_MESSAGE = (
    142         "Negative ampere draw measured, possibly due to power "
    143         "supply from USB cable. Check the setup of device and power "
    144         "monitor to make sure that the device is not connected "
    145         "to machine via USB directly. The device should be "
    146         "connected to the USB slot in the power monitor. It is okay "
    147         "to change the wiring when the test is in progress.")
    148 
    149 
    150     def __init__(self, max_baseline_amps):
    151         """
    152         Args:
    153             max_baseline_amps: The maximum value of baseline amperes
    154                     that we expect the device to consume at baseline state.
    155                     This can be different between models of phones.
    156         """
    157         power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor)
    158         testid = time.strftime("%d_%m_%Y__%H__%M_%S")
    159         self._power_monitor = power_monitors.Power_Monitor(log_file_id = testid)
    160         self._tcp_connect_port = 0  # any available port
    161         print ("Establishing connection to device...")
    162         self.setUsbEnabled(True)
    163         status = self._power_monitor.GetStatus()
    164         self._native_hz = status["sampleRate"] * 1000
    165         # the following describes power test being run (i.e on what sensor
    166         # and what type of test. This is used for logging.
    167         self._current_test = "None"
    168         self._external_storage = self.executeOnDevice(PowerTest.REQUEST_EXTERNAL_STORAGE)
    169         self._max_baseline_amps = max_baseline_amps
    170 
    171     def __del__(self):
    172         self.finalize()
    173 
    174     def finalize(self):
    175         """To be called upon termination of host connection to device"""
    176         if self._tcp_connect_port > 0:
    177             # tell device side to exit connection loop, and remove the forwarding
    178             # connection
    179             self.executeOnDevice(PowerTest.REQUEST_EXIT, reportErrors = False)
    180             self.executeLocal("adb forward --remove tcp:%d" % self._tcp_connect_port)
    181         self._tcp_connect_port = 0
    182         if self._power_monitor:
    183             self._power_monitor.Close()
    184             self._power_monitor = None
    185 
    186     def _send(self, msg, report_errors = True):
    187         """Connect to the device, send the given command, and then disconnect"""
    188         if self._tcp_connect_port == 0:
    189             # on first attempt to send a command, connect to device via any open port number,
    190             # forwarding that port to a local socket on the device via adb
    191             logging.debug("Seeking port for communication...")
    192             # discover an open port
    193             dummysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    194             dummysocket.bind(("localhost", 0))
    195             (_, self._tcp_connect_port) = dummysocket.getsockname()
    196             dummysocket.close()
    197             assert(self._tcp_connect_port > 0)
    198 
    199             status = self.executeLocal("adb forward tcp:%d localabstract:%s" %
    200                                        (self._tcp_connect_port, PowerTest.DOMAIN_NAME))
    201             # If the status !=0, then the host machine is unable to
    202             # forward requests to client over adb. Ending the test and logging error message
    203             # to the console on the host.
    204             self.endTestIfLostConnection(
    205                 status != 0,
    206                 "Unable to forward requests to client over adb")
    207             logging.info("Forwarding requests over local port %d",
    208                          self._tcp_connect_port)
    209 
    210         link = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    211 
    212         try:
    213             logging.debug("Connecting to device...")
    214             link.connect(("localhost", self._tcp_connect_port))
    215             logging.debug("Connected.")
    216         except socket.error as serr:
    217             print "Socket connection error: ", serr
    218             print "Finalizing and exiting the test"
    219             self.endTestIfLostConnection(
    220                 report_errors,
    221                 "Unable to communicate with device: connection refused")
    222         except:
    223             print "Non socket-related exception at this block in _send(); re-raising now."
    224             raise
    225         logging.debug("Sending '%s'", msg)
    226         link.sendall(msg)
    227         logging.debug("Getting response...")
    228         response = link.recv(4096)
    229         logging.debug("Got response '%s'", response)
    230         link.close()
    231         return response
    232 
    233     def queryDevice(self, query):
    234         """Post a yes/no query to the device, return True upon successful query, False otherwise"""
    235         logging.info("Querying device with '%s'", query)
    236         return self._send(query) == "OK"
    237 
    238     # TODO: abstract device communication (and string commands) into its own class
    239     def executeOnDevice(self, cmd, reportErrors = True):
    240         """Execute a (string) command on the remote device"""
    241         return self._send(cmd, reportErrors)
    242 
    243     def executeLocal(self, cmd, check_status = True):
    244         """execute a shell command locally (on the host)"""
    245         from subprocess import call
    246         status = call(cmd.split(" "))
    247         if status != 0 and check_status:
    248             logging.error("Failed to execute \"%s\"", cmd)
    249         else:
    250             logging.debug("Executed \"%s\"", cmd)
    251         return status
    252 
    253     def reportErrorRaiseExceptionIf(self, condition, msg):
    254         """Report an error condition to the device if condition is True.
    255         Will raise an exception on the device if condition is True.
    256         Args:
    257             condition: If true, this reports error
    258             msg: Message related to exception
    259         Raises:
    260             A PowerTestException encapsulating the message provided in msg
    261         """
    262         if condition:
    263             try:
    264                 logging.error("Exiting on error: %s" % msg)
    265                 self.executeOnDevice(PowerTest.REQUEST_RAISE % (self._current_test, msg),
    266                                      reportErrors = True)
    267             except:
    268                 logging.error("Unable to communicate with device to report "
    269                               "error: %s" % msg)
    270                 self.finalize()
    271                 sys.exit(msg)
    272             raise PowerTestException(msg)
    273 
    274     def endTestIfLostConnection(self, lost_connection, error_message):
    275         """
    276         This function ends the test if lost_connection was true,
    277         which indicates that the connection to the device was lost.
    278         Args:
    279             lost_connection: boolean variable, if True it indicates that
    280                 connection to device was lost and the test must be terminated.
    281             error_message: String to print to the host console before exiting the test
    282                 (if lost_connection is True)
    283         Returns:
    284             None.
    285         """
    286         if lost_connection:
    287             logging.error(error_message)
    288             self.finalize()
    289             sys.exit(error_message)
    290 
    291     def setUsbEnabled(self, enabled, verbose = True):
    292         if enabled:
    293             val = 1
    294         else:
    295             val = 0
    296         self._power_monitor.SetUsbPassthrough(val)
    297         tries = 0
    298 
    299         # Sometimes command won't go through first time, particularly if immediately after a data
    300         # collection, so allow for retries
    301         # TODO: Move this retry mechanism to the power monitor driver.
    302         status = self._power_monitor.GetStatus()
    303         while status is None and tries < 5:
    304             tries += 1
    305             time.sleep(2.0)
    306             logging.error("Retrying get status call...")
    307             self._power_monitor.StopDataCollection()
    308             self._power_monitor.SetUsbPassthrough(val)
    309             status = self._power_monitor.GetStatus()
    310 
    311         if enabled:
    312             if verbose:
    313                 print("...USB enabled, waiting for device")
    314             self.executeLocal("adb wait-for-device")
    315             if verbose:
    316                 print("...device online")
    317         else:
    318             if verbose:
    319                 logging.info("...USB disabled")
    320         # re-establish port forwarding
    321         if enabled and self._tcp_connect_port > 0:
    322             status = self.executeLocal("adb forward tcp:%d localabstract:%s" %
    323                                        (self._tcp_connect_port, PowerTest.DOMAIN_NAME))
    324             self.reportErrorRaiseExceptionIf(status != 0, msg = "Unable to forward requests to client over adb")
    325 
    326     def computeBaselineState(self, measurements):
    327         """
    328         Args:
    329             measurements: List of floats containing ampere draw measurements
    330                 taken from the monsoon power monitor.
    331                 Must be atleast 100 measurements long
    332         Returns:
    333             A tuple (isBaseline, mean_current) where isBaseline is a
    334             boolean that is True only if the baseline state for the phone is
    335             detected. mean_current is an estimate of the average baseline
    336             current for the device, which is valid only if baseline state is
    337             detected (if not, it is set to -1).
    338         """
    339 
    340         # Looks at the measurements to see if it is in baseline state
    341         if len(measurements) < 100:
    342             print(
    343                 "Need at least 100 measurements to determine if baseline state has"
    344                 " been reached")
    345             return (False, -1)
    346 
    347         # Assumption: At baseline state, the power profile is Gaussian distributed
    348         # with low-variance around the mean current draw.
    349         # Ideally we should find the mode from a histogram bin to find an estimated mean.
    350         # Assuming here that the median is very close to this value; later we check that the
    351         # variance of the samples is low enough to validate baseline.
    352         sorted_measurements = sorted(measurements)
    353         number_measurements = len(measurements)
    354         if not number_measurements % 2:
    355             median_measurement = (sorted_measurements[(number_measurements - 1) / 2] +
    356                                   sorted_measurements[(number_measurements + 1) / 2]) / 2
    357         else:
    358             median_measurement = sorted_measurements[number_measurements / 2]
    359 
    360         # Assume that at baseline state, a large fraction of power measurements
    361         # are within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE milliAmperes of
    362         # the average baseline current. Find all such measurements in the
    363         # sorted measurement vector.
    364         left_index = (
    365             bisect_left(
    366                 sorted_measurements,
    367                 median_measurement -
    368                 PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE))
    369         right_index = (
    370             bisect_left(
    371                 sorted_measurements,
    372                 median_measurement +
    373                 PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE))
    374 
    375         average_baseline_amps = scipy.mean(
    376             sorted_measurements[left_index: (right_index - 1)])
    377 
    378         detected_baseline = True
    379         # We enforce that a fraction of more than 'THRESHOLD_BASELINE_SAMPLES_FRACTION'
    380         # of samples must be within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE
    381         # milliAmperes of the mean baseline current, which we have estimated as
    382         # the median.
    383         if ((right_index - left_index) < PowerTest.THRESHOLD_BASELINE_SAMPLES_FRACTION * len(
    384                 measurements)):
    385             detected_baseline = False
    386 
    387         # We check for the maximum limit of the expected baseline
    388         if median_measurement > self._max_baseline_amps:
    389             detected_baseline = False
    390         if average_baseline_amps < 0:
    391             print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE
    392             detected_baseline = False
    393 
    394         print("%s baseline state" % ("Could detect" if detected_baseline else "Could NOT detect"))
    395         print(
    396             "median amps = %f, avg amps = %f, fraction of good samples = %f" %
    397             (median_measurement, average_baseline_amps,
    398              float(right_index - left_index) / len(measurements)))
    399         if PowerTest.ENABLE_PLOTTING:
    400             plt.plot(measurements)
    401             plt.show()
    402             print("To continue test, please close the plot window manually.")
    403         return (detected_baseline, average_baseline_amps)
    404 
    405     def isApInSuspendState(self, measurements_amps, nominal_max_amps, test_percentile):
    406         """
    407         This function detects AP suspend and display off state of phone
    408         after a sensor has been registered.
    409 
    410         Because the power profile can be very different between sensors and
    411         even across builds, it is difficult to specify a tight threshold for
    412         mean current draw or mandate that the power measurements must have low
    413         variance. We use a criteria that allows for a certain fraction of
    414         peaks in power spectrum and checks that test_percentile fraction of
    415         measurements must be below the specified value nominal_max_amps
    416         Args:
    417             measurements_amps: amperes draw measurements from power monitor
    418             test_percentile: the fraction of measurements we require to be below
    419                              a specified amps value
    420             nominal_max_amps: the specified value of the max current draw
    421         Returns:
    422             returns a boolean which is True if and only if the AP suspend and
    423             display off state is detected
    424         """
    425         count_good = len([m for m in measurements_amps if m < nominal_max_amps])
    426         count_negative = len([m for m in measurements_amps if m < 0])
    427         if count_negative > 0:
    428             print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE
    429             return False;
    430         return count_good > test_percentile * len(measurements_amps)
    431 
    432     def getBaselineState(self):
    433         """This function first disables all sensors, then collects measurements
    434         through the power monitor and continuously evaluates if baseline state
    435         is reached. Once baseline state is detected, it returns a tuple with
    436         status information. If baseline is not detected in a preset maximum
    437         number of trials, it returns as well.
    438 
    439         Returns:
    440             Returns a tuple (isBaseline, mean_current) where isBaseline is a
    441             boolean that is True only if the baseline state for the phone is
    442             detected. mean_current is an estimate of the average baseline current
    443             for the device, which is valid only if baseline state is detected
    444             (if not, it is set to -1)
    445         """
    446         self.setPowerOn("ALL", False)
    447         self.setUsbEnabled(False)
    448         print("Waiting %d seconds for baseline state" % DELAY_SCREEN_OFF)
    449         time.sleep(DELAY_SCREEN_OFF)
    450 
    451         MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION = 5  # seconds
    452         NUMBER_MEASUREMENTS_BASELINE_DETECTION = (
    453             PowerTest.RATE_NOMINAL *
    454             MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION)
    455         NUMBER_MEASUREMENTS_BASELINE_VERIFICATION = (
    456             NUMBER_MEASUREMENTS_BASELINE_DETECTION * 5)
    457         MAX_TRIALS = 50
    458 
    459         collected_baseline_measurements = False
    460 
    461         for tries in xrange(MAX_TRIALS):
    462             print("Trial number %d of %d..." % (tries, MAX_TRIALS))
    463             measurements = self.collectMeasurements(
    464                 NUMBER_MEASUREMENTS_BASELINE_DETECTION, PowerTest.RATE_NOMINAL,
    465                 verbose = False)
    466             if self.computeBaselineState(measurements)[0] is True:
    467                 collected_baseline_measurements = True
    468                 break
    469 
    470         if collected_baseline_measurements:
    471             print("Verifying baseline state over a longer interval "
    472                   "in order to double check baseline state")
    473             measurements = self.collectMeasurements(
    474                 NUMBER_MEASUREMENTS_BASELINE_VERIFICATION, PowerTest.RATE_NOMINAL,
    475                 verbose = False)
    476             self.reportErrorRaiseExceptionIf(
    477                 not measurements, "No background measurements could be taken")
    478             retval = self.computeBaselineState(measurements)
    479             if retval[0]:
    480                 print("Verified baseline.")
    481                 if measurements and LOG_DATA_TO_FILE:
    482                     with open("/tmp/cts-power-tests-background-data.log", "w") as f:
    483                         for m in measurements:
    484                             f.write("%.4f\n" % m)
    485             return retval
    486         else:
    487             return (False, -1)
    488 
    489     def waitForApSuspendMode(self):
    490         """This function repeatedly collects measurements until AP suspend and display off
    491         mode is detected. After a maximum number of trials, if this state is not reached, it
    492         raises an error.
    493         Returns:
    494             boolean which is True if device was detected to be in suspend state
    495         Raises:
    496             Power monitor-related exception
    497         """
    498         print("waitForApSuspendMode(): Sleeping for %d seconds" % DELAY_SCREEN_OFF)
    499         time.sleep(DELAY_SCREEN_OFF)
    500 
    501         NUMBER_MEASUREMENTS = 200
    502         # Maximum trials for which to collect measurements to get to Ap suspend
    503         # state
    504         MAX_TRIALS = 50
    505 
    506         got_to_suspend_state = False
    507         for count in xrange(MAX_TRIALS):
    508             print ("waitForApSuspendMode(): Trial %d of %d" % (count, MAX_TRIALS))
    509             measurements = self.collectMeasurements(NUMBER_MEASUREMENTS,
    510                                                     PowerTest.RATE_NOMINAL,
    511                                                     verbose = False)
    512             if self.isApInSuspendState(
    513                     measurements, PowerTest.MAX_PERCENTILE_AP_SCREEN_OFF_AMPS,
    514                     PowerTest.PERCENTILE_MAX_AP_SCREEN_OFF):
    515                 got_to_suspend_state = True
    516                 break
    517         self.reportErrorRaiseExceptionIf(
    518             got_to_suspend_state is False,
    519             msg = "Unable to determine application processor suspend mode status.")
    520         print("Got to AP suspend state")
    521         return got_to_suspend_state
    522 
    523     def collectMeasurements(self, measurementCount, rate, verbose = True):
    524         """Args:
    525             measurementCount: Number of measurements to collect from the power
    526                               monitor
    527             rate: The integer frequency in Hertz at which to collect measurements from
    528                   the power monitor
    529         Returns:
    530             A list containing measurements from the power monitor; that has the
    531             requested count of the number of measurements at the specified rate
    532         """
    533         assert (measurementCount > 0)
    534         decimate_by = self._native_hz / rate or 1
    535 
    536         self._power_monitor.StartDataCollection()
    537         sub_measurements = []
    538         measurements = []
    539         tries = 0
    540         if verbose: print("")
    541         try:
    542             while len(measurements) < measurementCount and tries < 5:
    543                 if tries:
    544                     self._power_monitor.StopDataCollection()
    545                     self._power_monitor.StartDataCollection()
    546                     time.sleep(1.0)
    547                 tries += 1
    548                 additional = self._power_monitor.CollectData()
    549                 if additional is not None:
    550                     tries = 0
    551                     sub_measurements.extend(additional)
    552                     while len(sub_measurements) >= decimate_by:
    553                         sub_avg = sum(sub_measurements[0:decimate_by]) / decimate_by
    554                         measurements.append(sub_avg)
    555                         sub_measurements = sub_measurements[decimate_by:]
    556                         if verbose:
    557                             # "\33[1A\33[2K" is a special Linux console control
    558                             # sequence for moving to the previous line, and
    559                             # erasing it; and reprinting new text on that
    560                             # erased line.
    561                             sys.stdout.write("\33[1A\33[2K")
    562                             print ("MEASURED[%d]: %f" % (len(measurements), measurements[-1]))
    563         finally:
    564             self._power_monitor.StopDataCollection()
    565 
    566         self.reportErrorRaiseExceptionIf(measurementCount > len(measurements),
    567                            "Unable to collect all requested measurements")
    568         return measurements
    569 
    570     def requestUserAcknowledgment(self, msg):
    571         """Post message to user on screen and wait for acknowledgment"""
    572         response = self.executeOnDevice(PowerTest.REQUEST_USER_RESPONSE % msg)
    573         self.reportErrorRaiseExceptionIf(
    574             response != "OK", "Unable to request user acknowledgment")
    575 
    576     def setTestResult(self, test_name, test_result, test_message):
    577         """
    578         Reports the result of a test to the device
    579         Args:
    580             test_name: name of the test
    581             test_result: Boolean result of the test (True means Pass)
    582             test_message: Relevant message
    583         """
    584         print ("Test %s : %s" % (test_name, test_result))
    585 
    586         response = (
    587             self.executeOnDevice(
    588                 PowerTest.REQUEST_SET_TEST_RESULT %
    589                 (test_name, test_result, test_message)))
    590         self.reportErrorRaiseExceptionIf(
    591             response != "OK", "Unable to send test status to Verifier")
    592 
    593     def setPowerOn(self, sensor, powered_on):
    594         response = self.executeOnDevice(PowerTest.REQUEST_SENSOR_SWITCH %
    595             (("ON" if powered_on else "OFF"), sensor))
    596         self.reportErrorRaiseExceptionIf(
    597             response == "ERR", "Unable to set sensor %s state" % sensor)
    598         logging.info("Set %s %s", sensor, ("ON" if powered_on else "OFF"))
    599         return response
    600 
    601     def runSensorPowerTest(
    602             self, sensor, max_amperes_allowed, baseline_amps, user_request = None):
    603         """
    604         Runs power test for a specific sensor; i.e. measures the amperes draw
    605         of the phone using monsoon, with the specified sensor mregistered
    606         and the phone in suspend state; and verifies that the incremental
    607         consumed amperes is within expected bounds.
    608         Args:
    609             sensor: The specified sensor for which to run the power test
    610             max_amperes_allowed: Maximum ampere draw of the device with the
    611                     sensor registered and device in suspend state
    612             baseline_amps: The power draw of the device when it is in baseline
    613                     state (no sensors registered, display off, AP asleep)
    614         """
    615         self._current_test = ("%s_Power_Test_While_%s" % (
    616             sensor, ("Under_Motion" if user_request is not None else "Still")))
    617         try:
    618             print ("\n\n---------------------------------")
    619             if user_request is not None:
    620                 print ("Running power test on %s under motion." % sensor)
    621             else:
    622                 print ("Running power test on %s while device is still." % sensor)
    623             print ("---------------------------------")
    624             response = self.executeOnDevice(
    625                 PowerTest.REQUEST_SENSOR_AVAILABILITY % sensor)
    626             if response == "UNAVAILABLE":
    627                 self.setTestResult(
    628                     self._current_test, test_result = "SKIPPED",
    629                     test_message = "Sensor %s not available on this platform" % sensor)
    630             self.setPowerOn("ALL", False)
    631             if response == "UNAVAILABLE":
    632                 self.setTestResult(
    633                     self._current_test, test_result = "SKIPPED",
    634                     test_message = "Sensor %s not available on this device" % sensor)
    635                 return
    636             self.reportErrorRaiseExceptionIf(response != "OK", "Unable to set all sensor off")
    637             self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF)
    638             self.setUsbEnabled(False)
    639             self.setUsbEnabled(True)
    640             self.setPowerOn(sensor, True)
    641             if user_request is not None:
    642                 print("===========================================\n" +
    643                       "==> Please follow the instructions presented on the device\n" +
    644                       "===========================================")
    645                 self.requestUserAcknowledgment(user_request)
    646             self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF)
    647             self.setUsbEnabled(False)
    648             self.reportErrorRaiseExceptionIf(
    649                 response != "OK", "Unable to set sensor %s ON" % sensor)
    650 
    651             self.waitForApSuspendMode()
    652             print ("Collecting sensor %s measurements" % sensor)
    653             measurements = self.collectMeasurements(PowerTest.SAMPLE_COUNT_NOMINAL,
    654                                                     PowerTest.RATE_NOMINAL)
    655 
    656             if measurements and LOG_DATA_TO_FILE:
    657                 with open("/tmp/cts-power-tests-%s-%s-sensor-data.log" % (sensor,
    658                     ("Under_Motion" if user_request is not None else "Still")), "w") as f:
    659                     for m in measurements:
    660                         f.write("%.4f\n" % m)
    661                     self.setUsbEnabled(True, verbose = False)
    662                     print("Saving raw data files to device...")
    663                     self.executeLocal("adb shell mkdir -p %s" % self._external_storage, False)
    664                     self.executeLocal("adb push %s %s/." % (f.name, self._external_storage))
    665                     self.setUsbEnabled(False, verbose = False)
    666             self.reportErrorRaiseExceptionIf(
    667                 not measurements, "No measurements could be taken for %s" % sensor)
    668             avg = sum(measurements) / len(measurements)
    669             squared = [(m - avg) * (m - avg) for m in measurements]
    670 
    671             stddev = math.sqrt(sum(squared) / len(squared))
    672             current_diff = avg - baseline_amps
    673             self.setUsbEnabled(True)
    674             max_power = max(measurements) - avg
    675             if current_diff <= max_amperes_allowed:
    676                 # TODO: fail the test of background > current
    677                 message = (
    678                               "Draw is within limits. Sensor delta:%f mAmp   Baseline:%f "
    679                               "mAmp   Sensor: %f mAmp  Stddev : %f mAmp  Peak: %f mAmp") % (
    680                               current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0,
    681                               stddev * 1000.0, max_power * 1000.0)
    682             else:
    683                 message = (
    684                               "Draw is too high. Current:%f Background:%f   Measured: %f "
    685                               "Stddev: %f  Peak: %f") % (
    686                               current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0,
    687                               stddev * 1000.0, max_power * 1000.0)
    688             self.setTestResult(
    689                 self._current_test,
    690                 ("PASS" if (current_diff <= max_amperes_allowed) else "FAIL"),
    691                 message)
    692             print("Result: " + message)
    693         except:
    694             traceback.print_exc()
    695             self.setTestResult(self._current_test, test_result = "FAIL",
    696                                test_message = "Exception occurred during run of test.")
    697             raise
    698 
    699     @staticmethod
    700     def runTests(max_baseline_amps):
    701         testrunner = None
    702         try:
    703             GENERIC_MOTION_REQUEST = ("\n===> Please press Next and when the "
    704                 "screen is off, keep the device under motion with only tiny, "
    705                 "slow movements until the screen turns on again.\nPlease "
    706                 "refrain from interacting with the screen or pressing any side "
    707                 "buttons while measurements are taken.")
    708             USER_STEPS_REQUEST = ("\n===> Please press Next and when the "
    709                 "screen is off, then move the device to simulate step motion "
    710                 "until the screen turns on again.\nPlease refrain from "
    711                 "interacting with the screen or pressing any side buttons "
    712                 "while measurements are taken.")
    713             testrunner = PowerTest(max_baseline_amps)
    714             testrunner.executeOnDevice(
    715                 PowerTest.REQUEST_SHOW_MESSAGE % "Connected.  Running tests...")
    716             is_baseline_success, baseline_amps = testrunner.getBaselineState()
    717 
    718             if is_baseline_success:
    719                 testrunner.setUsbEnabled(True)
    720                 # TODO: Enable testing a single sensor
    721                 testrunner.runSensorPowerTest(
    722                     "SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps,
    723                     user_request = GENERIC_MOTION_REQUEST)
    724                 testrunner.runSensorPowerTest(
    725                     "STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps,
    726                     user_request = USER_STEPS_REQUEST)
    727                 testrunner.runSensorPowerTest(
    728                     "STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps,
    729                     user_request = USER_STEPS_REQUEST)
    730                 testrunner.runSensorPowerTest(
    731                     "ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps,
    732                     user_request = GENERIC_MOTION_REQUEST)
    733                 testrunner.runSensorPowerTest(
    734                     "MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps,
    735                     user_request = GENERIC_MOTION_REQUEST)
    736                 testrunner.runSensorPowerTest(
    737                     "GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps,
    738                     user_request = GENERIC_MOTION_REQUEST)
    739                 testrunner.runSensorPowerTest(
    740                     "ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps,
    741                     user_request = None)
    742                 testrunner.runSensorPowerTest(
    743                     "MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps,
    744                     user_request = None)
    745                 testrunner.runSensorPowerTest(
    746                     "GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps,
    747                     user_request = None)
    748                 testrunner.runSensorPowerTest(
    749                     "SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps,
    750                     user_request = None)
    751                 testrunner.runSensorPowerTest(
    752                     "STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps,
    753                     user_request = None)
    754                 testrunner.runSensorPowerTest(
    755                     "STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps,
    756                     user_request = None)
    757             else:
    758                 print("Could not get to baseline state. This is either because "
    759                       "in several trials, the monitor could not measure a set "
    760                       "of power measurements that had the specified low "
    761                       "variance or the mean measurements were below the "
    762                       "expected value. None of the sensor power measurement "
    763                       " tests were performed due to not being able to detect "
    764                       "baseline state. Please re-run the power tests.")
    765         except KeyboardInterrupt:
    766             print "Keyboard interrupt from user."
    767             raise
    768         except:
    769             import traceback
    770             traceback.print_exc()
    771         finally:
    772             logging.info("TESTS COMPLETE")
    773             if testrunner:
    774                 try:
    775                     testrunner.finalize()
    776                 except socket.error:
    777                     sys.exit(
    778                         "===================================================\n"
    779                         "Unable to connect to device under test. Make sure \n"
    780                         "the device is connected via the usb pass-through, \n"
    781                         "the CtsVerifier app is running the SensorPowerTest on \n"
    782                         "the device, and USB pass-through is enabled.\n"
    783                         "===================================================")
    784 
    785 def main(argv):
    786     """ Simple command-line interface for a power test application."""
    787     useful_flags = ["voltage", "status", "usbpassthrough",
    788                     "samples", "current", "log", "power_monitor"]
    789     if not [f for f in useful_flags if FLAGS.get(f, None) is not None]:
    790         print __doc__.strip()
    791         print FLAGS.MainModuleHelp()
    792         return
    793 
    794     if FLAGS.avg and FLAGS.avg < 0:
    795         logging.error("--avg must be greater than 0")
    796         return
    797 
    798     if FLAGS.voltage is not None:
    799         if FLAGS.voltage > 5.5:
    800             print("!!WARNING: Voltage higher than typical values!!!")
    801         try:
    802             response = raw_input(
    803                 "Voltage of %.3f requested.  Confirm this is correct (Y/N)" %
    804                 FLAGS.voltage)
    805             if response.upper() != "Y":
    806                 sys.exit("Aborting")
    807         except:
    808             sys.exit("Aborting.")
    809 
    810     if not FLAGS.power_monitor:
    811         sys.exit(
    812             "You must specify a '--power_monitor' option to specify which power "
    813             "monitor type " +
    814             "you are using.\nOne of:\n  \n  ".join(available_monitors))
    815     power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor)
    816     try:
    817         mon = power_monitors.Power_Monitor(device = FLAGS.device)
    818     except:
    819         import traceback
    820 
    821         traceback.print_exc()
    822         sys.exit("No power monitors found")
    823 
    824     if FLAGS.voltage is not None:
    825 
    826         if FLAGS.ramp is not None:
    827             mon.RampVoltage(mon.start_voltage, FLAGS.voltage)
    828         else:
    829             mon.SetVoltage(FLAGS.voltage)
    830 
    831     if FLAGS.current is not None:
    832         mon.SetMaxCurrent(FLAGS.current)
    833 
    834     if FLAGS.status:
    835         items = sorted(mon.GetStatus().items())
    836         print "\n".join(["%s: %s" % item for item in items])
    837 
    838     if FLAGS.usbpassthrough:
    839         if FLAGS.usbpassthrough == "off":
    840             mon.SetUsbPassthrough(0)
    841         elif FLAGS.usbpassthrough == "on":
    842             mon.SetUsbPassthrough(1)
    843         elif FLAGS.usbpassthrough == "auto":
    844             mon.SetUsbPassthrough(2)
    845         else:
    846             mon.Close()
    847             sys.exit("bad pass-through flag: %s" % FLAGS.usbpassthrough)
    848 
    849     if FLAGS.samples:
    850         # Make sure state is normal
    851         mon.StopDataCollection()
    852         status = mon.GetStatus()
    853         native_hz = status["sampleRate"] * 1000
    854 
    855         # Collect and average samples as specified
    856         mon.StartDataCollection()
    857 
    858         # In case FLAGS.hz doesn't divide native_hz exactly, use this invariant:
    859         # 'offset' = (consumed samples) * FLAGS.hz - (emitted samples) * native_hz
    860         # This is the error accumulator in a variation of Bresenham's algorithm.
    861         emitted = offset = 0
    862         collected = []
    863         history_deque = collections.deque()  # past n samples for rolling average
    864 
    865         # TODO: Complicated lines of code below. Refactoring needed
    866         try:
    867             last_flush = time.time()
    868             while emitted < FLAGS.samples or FLAGS.samples == -1:
    869                 # The number of raw samples to consume before emitting the next output
    870                 need = (native_hz - offset + FLAGS.hz - 1) / FLAGS.hz
    871                 if need > len(collected):  # still need more input samples
    872                     samples = mon.CollectData()
    873                     if not samples: break
    874                     collected.extend(samples)
    875                 else:
    876                     # Have enough data, generate output samples.
    877                     # Adjust for consuming 'need' input samples.
    878                     offset += need * FLAGS.hz
    879                     while offset >= native_hz:  # maybe multiple, if FLAGS.hz > native_hz
    880                         this_sample = sum(collected[:need]) / need
    881 
    882                         if FLAGS.timestamp: print int(time.time()),
    883 
    884                         if FLAGS.avg:
    885                             history_deque.appendleft(this_sample)
    886                             if len(history_deque) > FLAGS.avg: history_deque.pop()
    887                             print "%f %f" % (this_sample,
    888                                              sum(history_deque) / len(history_deque))
    889                         else:
    890                             print "%f" % this_sample
    891                         sys.stdout.flush()
    892 
    893                         offset -= native_hz
    894                         emitted += 1  # adjust for emitting 1 output sample
    895                     collected = collected[need:]
    896                     now = time.time()
    897                     if now - last_flush >= 0.99:  # flush every second
    898                         sys.stdout.flush()
    899                         last_flush = now
    900         except KeyboardInterrupt:
    901             print("interrupted")
    902             return 1
    903         finally:
    904             mon.Close()
    905         return 0
    906 
    907     if FLAGS.run:
    908         if not FLAGS.power_monitor:
    909             sys.exit(
    910                 "When running power tests, you must specify which type of power "
    911                 "monitor to use" +
    912                 " with '--power_monitor <type of power monitor>'")
    913         try:
    914             PowerTest.runTests(FLAGS.max_baseline_amps)
    915         except KeyboardInterrupt:
    916             print "Keyboard interrupt from user"
    917 
    918 if __name__ == "__main__":
    919     flags.DEFINE_boolean("status", None, "Print power meter status")
    920     flags.DEFINE_integer("avg", None,
    921                          "Also report average over last n data points")
    922     flags.DEFINE_float("voltage", None, "Set output voltage (0 for off)")
    923     flags.DEFINE_float("current", None, "Set max output current")
    924     flags.DEFINE_string("usbpassthrough", None, "USB control (on, off, auto)")
    925     flags.DEFINE_integer("samples", None, "Collect and print this many samples")
    926     flags.DEFINE_integer("hz", 5000, "Print this many samples/sec")
    927     flags.DEFINE_string("device", None,
    928                         "Path to the device in /dev/... (ex:/dev/ttyACM1)")
    929     flags.DEFINE_boolean("timestamp", None,
    930                          "Also print integer (seconds) timestamp on each line")
    931     flags.DEFINE_boolean("ramp", True, "Gradually increase voltage")
    932     flags.DEFINE_boolean("log", False, "Log progress to a file or not")
    933     flags.DEFINE_boolean("run", False, "Run the test suite for power")
    934     flags.DEFINE_string("power_monitor", None, "Type of power monitor to use")
    935     flags.DEFINE_float("max_baseline_amps", 0.005,
    936                        "Set maximum baseline current for device being tested")
    937     sys.exit(main(FLAGS(sys.argv)))
    938