Home | History | Annotate | Download | only in common_lib
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import error, logging, os, serial, shutil, threading, time
      6 
      7 _power_play_data_file = '/tmp/power_play_data'
      8 
      9 class PowerPlay(object):
     10     """Class to record serial over USB data from Power Play (go/powerplay).
     11 
     12     It detects if powerplay is connected to the DUT over USB and opens the
     13     serial port to start receiving powerplay data. It also opens a text file to
     14     save this data after some formatting.
     15     """
     16 
     17     version = 1
     18 
     19     def __init__(self, test_obj, record_interval=0):
     20         """Initialize PowerPlay.
     21 
     22         @param test_obj: test object.
     23         @param record_interval: Power play data recording interval in seconds.
     24         """
     25         self.test = test_obj
     26         self.ser = None
     27         self.recording_interval = record_interval
     28         self.momentary_curr_list = list()
     29         self.record_thread = None
     30 
     31     def extract_current(self, pp_data):
     32         """Extract momentary current value from each line of powerplay data.
     33 
     34         @param pp_data: Single line of powerplay data with eight comma separated
     35                         values.
     36         @return list containing momentary current values.
     37         """
     38         if pp_data[0].isdigit():
     39             self.momentary_curr_list.append(float(pp_data[pp_data.index(',')+1:]
     40                     [:pp_data[pp_data.index(',')+1:].index(',')]))
     41         return self.momentary_curr_list
     42 
     43     def start_recording_power_play_data(self):
     44         """Starts a new thread to record power play data."""
     45         self.record_thread = threading.Thread(target=self.start_record_thread)
     46         self.record_thread.daemon = True
     47         self.record_thread.start()
     48 
     49     def start_record_thread(self):
     50         """Start recording power play data.
     51 
     52         Get a list of connected USB devices and try to establish a serial
     53         connection. Once the connection is established, open a text file and
     54         start reading serial data and write it to the text file after some
     55         formatting.
     56         """
     57         devices = [x for x in os.listdir('/dev/') if x.startswith('ttyUSB')]
     58 
     59         for device in devices:
     60             device_link = '/dev/' + device
     61             try:
     62                 if self.ser == None:
     63                     logging.info('Trying ... %s', device_link)
     64                     self.ser = serial.Serial(device_link, 115200)
     65                     logging.info('Successfully connected to %s', device_link)
     66                     break
     67             except serial.SerialException, e:
     68                 raise error.TestError('Failed to connect to %s becuase of %s' %
     69                                      (device_link, str(e)))
     70 
     71         self.text_file = open(_power_play_data_file, 'w')
     72 
     73         if self.ser != None:
     74             title_row = ('time,powerplay_timestamp,momentary_current (A),' +
     75                     'momentary_charge (AH),average_current (A),' +
     76                     'total_standby_time,total_wake_time,num_wakes,is_awake?\n')
     77             self.text_file.write(title_row)
     78             start_time = time.time()
     79             while self.ser.readline():
     80                 current_timestamp = (('{:>10.3f}'.
     81                         format(time.time() - start_time)).replace(' ', ''))
     82                 pp_data = (self.ser.readline().replace('\00', '').
     83                         replace(' ', ',').replace('\r', ''))
     84                 if (not pp_data.startswith('#') and (len(pp_data) > 30) and
     85                         not self.text_file.closed):
     86                     self.text_file.write(current_timestamp + ',' + pp_data)
     87                     self.momentary_curr_list = self.extract_current(pp_data)
     88                 time.sleep(self.recording_interval)
     89                 self.ser.flushInput()
     90         else:
     91             self.text_file.write('No data from powerplay. Check connection.')
     92 
     93     def stop_recording_power_play_data(self):
     94         """Stop recording power play data.
     95 
     96         Close the text file and copy it to the test log results directory. Also
     97         report current data to the performance dashboard.
     98         """
     99         if not self.text_file.closed:
    100             self.text_file.close()
    101         shutil.copy(_power_play_data_file, self.test.resultsdir)
    102         self.test.output_perf_value(description='momentary_current_draw',
    103                                value=self.momentary_curr_list,
    104                                units='Amps', higher_is_better=False)
    105