1 # Copyright 2015 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import error, logging, os, serial, shutil, threading, time 6 7 _power_play_data_file = '/tmp/power_play_data' 8 9 class PowerPlay(object): 10 """Class to record serial over USB data from Power Play (go/powerplay). 11 12 It detects if powerplay is connected to the DUT over USB and opens the 13 serial port to start receiving powerplay data. It also opens a text file to 14 save this data after some formatting. 15 """ 16 17 version = 1 18 19 def __init__(self, test_obj, record_interval=0): 20 """Initialize PowerPlay. 21 22 @param test_obj: test object. 23 @param record_interval: Power play data recording interval in seconds. 24 """ 25 self.test = test_obj 26 self.ser = None 27 self.recording_interval = record_interval 28 self.momentary_curr_list = list() 29 self.record_thread = None 30 31 def extract_current(self, pp_data): 32 """Extract momentary current value from each line of powerplay data. 33 34 @param pp_data: Single line of powerplay data with eight comma separated 35 values. 36 @return list containing momentary current values. 37 """ 38 if pp_data[0].isdigit(): 39 self.momentary_curr_list.append(float(pp_data[pp_data.index(',')+1:] 40 [:pp_data[pp_data.index(',')+1:].index(',')])) 41 return self.momentary_curr_list 42 43 def start_recording_power_play_data(self): 44 """Starts a new thread to record power play data.""" 45 self.record_thread = threading.Thread(target=self.start_record_thread) 46 self.record_thread.daemon = True 47 self.record_thread.start() 48 49 def start_record_thread(self): 50 """Start recording power play data. 51 52 Get a list of connected USB devices and try to establish a serial 53 connection. Once the connection is established, open a text file and 54 start reading serial data and write it to the text file after some 55 formatting. 56 """ 57 devices = [x for x in os.listdir('/dev/') if x.startswith('ttyUSB')] 58 59 for device in devices: 60 device_link = '/dev/' + device 61 try: 62 if self.ser == None: 63 logging.info('Trying ... %s', device_link) 64 self.ser = serial.Serial(device_link, 115200) 65 logging.info('Successfully connected to %s', device_link) 66 break 67 except serial.SerialException, e: 68 raise error.TestError('Failed to connect to %s becuase of %s' % 69 (device_link, str(e))) 70 71 self.text_file = open(_power_play_data_file, 'w') 72 73 if self.ser != None: 74 title_row = ('time,powerplay_timestamp,momentary_current (A),' + 75 'momentary_charge (AH),average_current (A),' + 76 'total_standby_time,total_wake_time,num_wakes,is_awake?\n') 77 self.text_file.write(title_row) 78 start_time = time.time() 79 while self.ser.readline(): 80 current_timestamp = (('{:>10.3f}'. 81 format(time.time() - start_time)).replace(' ', '')) 82 pp_data = (self.ser.readline().replace('\00', ''). 83 replace(' ', ',').replace('\r', '')) 84 if (not pp_data.startswith('#') and (len(pp_data) > 30) and 85 not self.text_file.closed): 86 self.text_file.write(current_timestamp + ',' + pp_data) 87 self.momentary_curr_list = self.extract_current(pp_data) 88 time.sleep(self.recording_interval) 89 self.ser.flushInput() 90 else: 91 self.text_file.write('No data from powerplay. Check connection.') 92 93 def stop_recording_power_play_data(self): 94 """Stop recording power play data. 95 96 Close the text file and copy it to the test log results directory. Also 97 report current data to the performance dashboard. 98 """ 99 if not self.text_file.closed: 100 self.text_file.close() 101 shutil.copy(_power_play_data_file, self.test.resultsdir) 102 self.test.output_perf_value(description='momentary_current_draw', 103 value=self.momentary_curr_list, 104 units='Amps', higher_is_better=False) 105