Home | History | Annotate | Download | only in i2c
      1 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 '''
      6 USB to I2C controller.
      7 '''
      8 
      9 import glob
     10 import logging
     11 import os
     12 import re
     13 import serial
     14 import time
     15 
     16 from autotest_lib.client.cros import tty
     17 
     18 # Least significant bit of I2C address.
     19 WRITE_BIT = 0
     20 READ_BIT = 1
     21 
     22 
     23 def create_i2c_controller(chipset_config):
     24     '''Factory method for I2CController.
     25 
     26     This function is a factory method to create an I2CController instance.
     27 
     28     @param chipset_config: Chipset configuration.
     29     @return An I2CController if succeeded.
     30     @throws AssertionError if a valid instance cannot be created.
     31     '''
     32     if chipset_config.split(':')[0] == 'SC18IM700':
     33         usb_uart_driver = chipset_config.split(':')[1]
     34         # Try to find a tty terminal that driver name matches usb_uart_driver.
     35         tty_path = tty.find_tty_by_driver(usb_uart_driver)
     36         if tty_path:
     37             return _I2CControllerSC18IM700(tty_path)
     38 
     39     assert False, "Unsupported configuration: %s" % chipset_config
     40 
     41 
     42 class I2CController(object):
     43     '''
     44     The base class of I2C controller.
     45     '''
     46 
     47     # Constants indicate I2C bus status.
     48     I2C_OK = 1
     49     I2C_NACK_ON_ADDRESS = 2
     50     I2C_NACK_ON_DATA = 3
     51     I2C_TIME_OUT = 4
     52 
     53     def send_and_check_status(self, slave_addr, int_array):
     54         '''Sends data to I2C slave device and checks the bus status.
     55 
     56         @param slave_addr: The address of slave in 7bits format.
     57         @param int_array: The data to send in integer array.
     58         @param status_check: Whether to check I2C bus status.
     59 
     60         @return An integer indicates I2C bus status.
     61         '''
     62         self.send(slave_addr, int_array)
     63         return self.read_bus_status()
     64 
     65     def read_bus_status(self):
     66         '''Returns the I2C bus status.'''
     67         raise NotImplementedError
     68 
     69     def send(self, slave_addr, int_array):
     70         '''Sends data to I2C slave device.
     71 
     72         Caller should call read_bus_status() explicitly to confirm whether the
     73         data sent successfully.
     74 
     75         @param slave_addr: The address of slave in 7bits format.
     76         @param int_array: The data to send in integer array.
     77         '''
     78         raise NotImplementedError
     79 
     80     def read(self, slave_addr, bytes_to_read):
     81         '''Reads data from I2C slave device.
     82 
     83         @param slave_addr: The address of slave in 7bits format.
     84         @param bytes_to_read: The number of bytes to read from device.
     85         @return An array of data.
     86         '''
     87         raise NotImplementedError
     88 
     89 
     90 class _I2CControllerSC18IM700(I2CController):
     91     '''
     92     Implementation of I2C Controller for NXP SC18IM700.
     93     '''
     94     SEC_WAIT_I2C = 0.1
     95 
     96     # Constants from official datasheet.
     97     # http://www.nxp.com/documents/data_sheet/SC18IM700.pdf
     98     I2C_STATUS = {0b11110000: I2CController.I2C_OK,
     99                   0b11110001: I2CController.I2C_NACK_ON_ADDRESS,
    100                   0b11110011: I2CController.I2C_NACK_ON_DATA,
    101                   0b11111000: I2CController.I2C_TIME_OUT}
    102 
    103     def __init__(self, device_path):
    104         '''Connects to NXP via serial port.
    105 
    106         @param device_path: The device path of serial port.
    107         '''
    108         self.logger = logging.getLogger('SC18IM700')
    109         self.logger.info('Setup serial device... [%s]', device_path)
    110         self.device_path = device_path
    111         self.serial = serial.Serial(port=self.device_path,
    112                                     baudrate=9600,
    113                                     bytesize=serial.EIGHTBITS,
    114                                     parity=serial.PARITY_NONE,
    115                                     stopbits=serial.STOPBITS_ONE,
    116                                     xonxoff=False,
    117                                     rtscts=True,
    118                                     interCharTimeout=1)
    119         self.logger.info('pySerial [%s] configuration : %s',
    120                          serial.VERSION, self.serial.__repr__())
    121         # Clean the buffer.
    122         self.serial.flush()
    123 
    124     def _write(self, data):
    125         '''Converts data to bytearray and writes to the serial port.'''
    126         self.serial.write(bytearray(data))
    127         self.serial.flush()
    128 
    129     def _read(self):
    130         '''Reads data from serial port(Non-Blocking).'''
    131         ret = self.serial.read(self.serial.inWaiting())
    132         self.logger.info('Hex and binary dump of datas - ')
    133         for char in ret:
    134             self.logger.info('  %x - %s', ord(char), bin(ord(char)))
    135         return ret
    136 
    137     @staticmethod
    138     def _convert_to_8bits_addr(slave_addr_7bits, lsb):
    139         '''Converts slave_addr from 7 bits to 8 bits with given LSB.'''
    140         assert (slave_addr_7bits >> 7) == 0, "Address must not exceed 7 bits."
    141         assert (lsb & ~0x01) == 0, "lsb must not exceed one bit."
    142         return (slave_addr_7bits << 1) | lsb
    143 
    144     def read_bus_status(self):
    145         cmd = [ord('R'), 0x0A, ord('P')]
    146         self._write(cmd)
    147         time.sleep(self.SEC_WAIT_I2C)
    148         ret = self._read()
    149         if (len(ret) == 1) and (ord(ret[0]) in self.I2C_STATUS):
    150             return self.I2C_STATUS[ord(ret[0])]
    151         raise IOError("I2C_STATUS_READ_FAILED")
    152 
    153     def send(self, slave_addr, int_array):
    154         cmd = ([ord('S'),
    155                 self._convert_to_8bits_addr(slave_addr, WRITE_BIT),
    156                 len(int_array)] +
    157                int_array + [ord('P')])
    158         self._write(cmd)
    159 
    160     def read(self, slave_addr, bytes_to_read):
    161         cmd = ([ord('S'),
    162                 self._convert_to_8bits_addr(slave_addr, READ_BIT),
    163                 bytes_to_read,
    164                 ord('P')])
    165         self._write(cmd)
    166         time.sleep(self.SEC_WAIT_I2C)
    167         return self._read()
    168 
    169     def write_gpio(self, data):
    170         self._write([ord('O'), data, ord('P')])
    171 
    172     def read_gpio(self):
    173         self._write([ord('I'), ord('P')])
    174         time.sleep(self.SEC_WAIT_I2C)
    175         return self._read()
    176 
    177     def write_register(self, regs, datas):
    178         assert len(regs) == len(datas)
    179         cmd = [ord('W')]
    180         for i in range(len(regs)):
    181             cmd.append(regs[i])
    182             cmd.append(datas[i])
    183         cmd.append(ord('P'))
    184         self._write(cmd)
    185 
    186     def read_register(self, regs):
    187         cmd = [ord('R')] + regs + [ord('P')]
    188         self._write(cmd)
    189         time.sleep(self.SEC_WAIT_I2C)
    190         return self._read()
    191