Home | History | Annotate | Download | only in serial
      1 #! python
      2 # Python Serial Port Extension for Win32, Linux, BSD, Jython and .NET/Mono
      3 # serial driver for .NET/Mono (IronPython), .NET >= 2
      4 # see __init__.py
      5 #
      6 # (C) 2008 Chris Liechti <cliechti (at] gmx.net>
      7 # this is distributed under a free software license, see license.txt
      8 
      9 import clr
     10 import System
     11 import System.IO.Ports
     12 from serial.serialutil import *
     13 
     14 
     15 def device(portnum):
     16     """Turn a port number into a device name"""
     17     return System.IO.Ports.SerialPort.GetPortNames()[portnum]
     18 
     19 
     20 # must invoke function with byte array, make a helper to convert strings
     21 # to byte arrays
     22 sab = System.Array[System.Byte]
     23 def as_byte_array(string):
     24     return sab([ord(x) for x in string])  # XXX will require adaption when run with a 3.x compatible IronPython
     25 
     26 class IronSerial(SerialBase):
     27     """Serial port implementation for .NET/Mono."""
     28 
     29     BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
     30                 9600, 19200, 38400, 57600, 115200)
     31 
     32     def open(self):
     33         """Open port with current settings. This may throw a SerialException
     34            if the port cannot be opened."""
     35         if self._port is None:
     36             raise SerialException("Port must be configured before it can be used.")
     37         if self._isOpen:
     38             raise SerialException("Port is already open.")
     39         try:
     40             self._port_handle = System.IO.Ports.SerialPort(self.portstr)
     41         except Exception, msg:
     42             self._port_handle = None
     43             raise SerialException("could not open port %s: %s" % (self.portstr, msg))
     44 
     45         self._reconfigurePort()
     46         self._port_handle.Open()
     47         self._isOpen = True
     48         if not self._rtscts:
     49             self.setRTS(True)
     50             self.setDTR(True)
     51         self.flushInput()
     52         self.flushOutput()
     53 
     54     def _reconfigurePort(self):
     55         """Set communication parameters on opened port."""
     56         if not self._port_handle:
     57             raise SerialException("Can only operate on a valid port handle")
     58 
     59         #~ self._port_handle.ReceivedBytesThreshold = 1
     60 
     61         if self._timeout is None:
     62             self._port_handle.ReadTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
     63         else:
     64             self._port_handle.ReadTimeout = int(self._timeout*1000)
     65 
     66         # if self._timeout != 0 and self._interCharTimeout is not None:
     67             # timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
     68 
     69         if self._writeTimeout is None:
     70             self._port_handle.WriteTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
     71         else:
     72             self._port_handle.WriteTimeout = int(self._writeTimeout*1000)
     73 
     74 
     75         # Setup the connection info.
     76         try:
     77             self._port_handle.BaudRate = self._baudrate
     78         except IOError, e:
     79             # catch errors from illegal baudrate settings
     80             raise ValueError(str(e))
     81 
     82         if self._bytesize == FIVEBITS:
     83             self._port_handle.DataBits     = 5
     84         elif self._bytesize == SIXBITS:
     85             self._port_handle.DataBits     = 6
     86         elif self._bytesize == SEVENBITS:
     87             self._port_handle.DataBits     = 7
     88         elif self._bytesize == EIGHTBITS:
     89             self._port_handle.DataBits     = 8
     90         else:
     91             raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
     92 
     93         if self._parity == PARITY_NONE:
     94             self._port_handle.Parity       = getattr(System.IO.Ports.Parity, 'None') # reserved keyword in Py3k
     95         elif self._parity == PARITY_EVEN:
     96             self._port_handle.Parity       = System.IO.Ports.Parity.Even
     97         elif self._parity == PARITY_ODD:
     98             self._port_handle.Parity       = System.IO.Ports.Parity.Odd
     99         elif self._parity == PARITY_MARK:
    100             self._port_handle.Parity       = System.IO.Ports.Parity.Mark
    101         elif self._parity == PARITY_SPACE:
    102             self._port_handle.Parity       = System.IO.Ports.Parity.Space
    103         else:
    104             raise ValueError("Unsupported parity mode: %r" % self._parity)
    105 
    106         if self._stopbits == STOPBITS_ONE:
    107             self._port_handle.StopBits     = System.IO.Ports.StopBits.One
    108         elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
    109             self._port_handle.StopBits     = System.IO.Ports.StopBits.OnePointFive
    110         elif self._stopbits == STOPBITS_TWO:
    111             self._port_handle.StopBits     = System.IO.Ports.StopBits.Two
    112         else:
    113             raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
    114 
    115         if self._rtscts and self._xonxoff:
    116             self._port_handle.Handshake  = System.IO.Ports.Handshake.RequestToSendXOnXOff
    117         elif self._rtscts:
    118             self._port_handle.Handshake  = System.IO.Ports.Handshake.RequestToSend
    119         elif self._xonxoff:
    120             self._port_handle.Handshake  = System.IO.Ports.Handshake.XOnXOff
    121         else:
    122             self._port_handle.Handshake  = getattr(System.IO.Ports.Handshake, 'None')   # reserved keyword in Py3k
    123 
    124     #~ def __del__(self):
    125         #~ self.close()
    126 
    127     def close(self):
    128         """Close port"""
    129         if self._isOpen:
    130             if self._port_handle:
    131                 try:
    132                     self._port_handle.Close()
    133                 except System.IO.Ports.InvalidOperationException:
    134                     # ignore errors. can happen for unplugged USB serial devices
    135                     pass
    136                 self._port_handle = None
    137             self._isOpen = False
    138 
    139     def makeDeviceName(self, port):
    140         try:
    141             return device(port)
    142         except TypeError, e:
    143             raise SerialException(str(e))
    144 
    145     #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
    146 
    147     def inWaiting(self):
    148         """Return the number of characters currently in the input buffer."""
    149         if not self._port_handle: raise portNotOpenError
    150         return self._port_handle.BytesToRead
    151 
    152     def read(self, size=1):
    153         """Read size bytes from the serial port. If a timeout is set it may
    154            return less characters as requested. With no timeout it will block
    155            until the requested number of bytes is read."""
    156         if not self._port_handle: raise portNotOpenError
    157         # must use single byte reads as this is the only way to read
    158         # without applying encodings
    159         data = bytearray()
    160         while size:
    161             try:
    162                 data.append(self._port_handle.ReadByte())
    163             except System.TimeoutException, e:
    164                 break
    165             else:
    166                 size -= 1
    167         return bytes(data)
    168 
    169     def write(self, data):
    170         """Output the given string over the serial port."""
    171         if not self._port_handle: raise portNotOpenError
    172         if not isinstance(data, (bytes, bytearray)):
    173             raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
    174         try:
    175             # must call overloaded method with byte array argument
    176             # as this is the only one not applying encodings
    177             self._port_handle.Write(as_byte_array(data), 0, len(data))
    178         except System.TimeoutException, e:
    179             raise writeTimeoutError
    180         return len(data)
    181 
    182     def flushInput(self):
    183         """Clear input buffer, discarding all that is in the buffer."""
    184         if not self._port_handle: raise portNotOpenError
    185         self._port_handle.DiscardInBuffer()
    186 
    187     def flushOutput(self):
    188         """Clear output buffer, aborting the current output and
    189         discarding all that is in the buffer."""
    190         if not self._port_handle: raise portNotOpenError
    191         self._port_handle.DiscardOutBuffer()
    192 
    193     def sendBreak(self, duration=0.25):
    194         """Send break condition. Timed, returns to idle state after given duration."""
    195         if not self._port_handle: raise portNotOpenError
    196         import time
    197         self._port_handle.BreakState = True
    198         time.sleep(duration)
    199         self._port_handle.BreakState = False
    200 
    201     def setBreak(self, level=True):
    202         """Set break: Controls TXD. When active, to transmitting is possible."""
    203         if not self._port_handle: raise portNotOpenError
    204         self._port_handle.BreakState = bool(level)
    205 
    206     def setRTS(self, level=True):
    207         """Set terminal status line: Request To Send"""
    208         if not self._port_handle: raise portNotOpenError
    209         self._port_handle.RtsEnable = bool(level)
    210 
    211     def setDTR(self, level=True):
    212         """Set terminal status line: Data Terminal Ready"""
    213         if not self._port_handle: raise portNotOpenError
    214         self._port_handle.DtrEnable = bool(level)
    215 
    216     def getCTS(self):
    217         """Read terminal status line: Clear To Send"""
    218         if not self._port_handle: raise portNotOpenError
    219         return self._port_handle.CtsHolding
    220 
    221     def getDSR(self):
    222         """Read terminal status line: Data Set Ready"""
    223         if not self._port_handle: raise portNotOpenError
    224         return self._port_handle.DsrHolding
    225 
    226     def getRI(self):
    227         """Read terminal status line: Ring Indicator"""
    228         if not self._port_handle: raise portNotOpenError
    229         #~ return self._port_handle.XXX
    230         return False #XXX an error would be better
    231 
    232     def getCD(self):
    233         """Read terminal status line: Carrier Detect"""
    234         if not self._port_handle: raise portNotOpenError
    235         return self._port_handle.CDHolding
    236 
    237     # - - platform specific - - - -
    238     # none
    239 
    240 
    241 # assemble Serial class with the platform specific implementation and the base
    242 # for file-like behavior. for Python 2.6 and newer, that provide the new I/O
    243 # library, derive from io.RawIOBase
    244 try:
    245     import io
    246 except ImportError:
    247     # classic version with our own file-like emulation
    248     class Serial(IronSerial, FileLike):
    249         pass
    250 else:
    251     # io library present
    252     class Serial(IronSerial, io.RawIOBase):
    253         pass
    254 
    255 
    256 # Nur Testfunktion!!
    257 if __name__ == '__main__':
    258     import sys
    259 
    260     s = Serial(0)
    261     sys.stdio.write('%s\n' % s)
    262 
    263     s = Serial()
    264     sys.stdio.write('%s\n' % s)
    265 
    266 
    267     s.baudrate = 19200
    268     s.databits = 7
    269     s.close()
    270     s.port = 0
    271     s.open()
    272     sys.stdio.write('%s\n' % s)
    273 
    274