Home | History | Annotate | Download | only in urlhandler
      1 #! python
      2 #
      3 # Python Serial Port Extension for Win32, Linux, BSD, Jython
      4 # see __init__.py
      5 #
      6 # This module implements a loop back connection receiving itself what it sent.
      7 #
      8 # The purpose of this module is.. well... You can run the unit tests with it.
      9 # and it was so easy to implement ;-)
     10 #
     11 # (C) 2001-2011 Chris Liechti <cliechti (at] gmx.net>
     12 # this is distributed under a free software license, see license.txt
     13 #
     14 # URL format:    loop://[option[/option...]]
     15 # options:
     16 # - "debug" print diagnostic messages
     17 
     18 from serial.serialutil import *
     19 import threading
     20 import time
     21 import logging
     22 
     23 # map log level names to constants. used in fromURL()
     24 LOGGER_LEVELS = {
     25     'debug': logging.DEBUG,
     26     'info': logging.INFO,
     27     'warning': logging.WARNING,
     28     'error': logging.ERROR,
     29     }
     30 
     31 
     32 class LoopbackSerial(SerialBase):
     33     """Serial port implementation that simulates a loop back connection in plain software."""
     34 
     35     BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
     36                  9600, 19200, 38400, 57600, 115200)
     37 
     38     def open(self):
     39         """Open port with current settings. This may throw a SerialException
     40            if the port cannot be opened."""
     41         if self._isOpen:
     42             raise SerialException("Port is already open.")
     43         self.logger = None
     44         self.buffer_lock = threading.Lock()
     45         self.loop_buffer = bytearray()
     46         self.cts = False
     47         self.dsr = False
     48 
     49         if self._port is None:
     50             raise SerialException("Port must be configured before it can be used.")
     51         # not that there is anything to open, but the function applies the
     52         # options found in the URL
     53         self.fromURL(self.port)
     54 
     55         # not that there anything to configure...
     56         self._reconfigurePort()
     57         # all things set up get, now a clean start
     58         self._isOpen = True
     59         if not self._rtscts:
     60             self.setRTS(True)
     61             self.setDTR(True)
     62         self.flushInput()
     63         self.flushOutput()
     64 
     65     def _reconfigurePort(self):
     66         """Set communication parameters on opened port. for the loop://
     67         protocol all settings are ignored!"""
     68         # not that's it of any real use, but it helps in the unit tests
     69         if not isinstance(self._baudrate, (int, long)) or not 0 < self._baudrate < 2**32:
     70             raise ValueError("invalid baudrate: %r" % (self._baudrate))
     71         if self.logger:
     72             self.logger.info('_reconfigurePort()')
     73 
     74     def close(self):
     75         """Close port"""
     76         if self._isOpen:
     77             self._isOpen = False
     78             # in case of quick reconnects, give the server some time
     79             time.sleep(0.3)
     80 
     81     def makeDeviceName(self, port):
     82         raise SerialException("there is no sensible way to turn numbers into URLs")
     83 
     84     def fromURL(self, url):
     85         """extract host and port from an URL string"""
     86         if url.lower().startswith("loop://"): url = url[7:]
     87         try:
     88             # process options now, directly altering self
     89             for option in url.split('/'):
     90                 if '=' in option:
     91                     option, value = option.split('=', 1)
     92                 else:
     93                     value = None
     94                 if not option:
     95                     pass
     96                 elif option == 'logging':
     97                     logging.basicConfig()   # XXX is that good to call it here?
     98                     self.logger = logging.getLogger('pySerial.loop')
     99                     self.logger.setLevel(LOGGER_LEVELS[value])
    100                     self.logger.debug('enabled logging')
    101                 else:
    102                     raise ValueError('unknown option: %r' % (option,))
    103         except ValueError, e:
    104             raise SerialException('expected a string in the form "[loop://][option[/option...]]": %s' % e)
    105 
    106     #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
    107 
    108     def inWaiting(self):
    109         """Return the number of characters currently in the input buffer."""
    110         if not self._isOpen: raise portNotOpenError
    111         if self.logger:
    112             # attention the logged value can differ from return value in
    113             # threaded environments...
    114             self.logger.debug('inWaiting() -> %d' % (len(self.loop_buffer),))
    115         return len(self.loop_buffer)
    116 
    117     def read(self, size=1):
    118         """Read size bytes from the serial port. If a timeout is set it may
    119         return less characters as requested. With no timeout it will block
    120         until the requested number of bytes is read."""
    121         if not self._isOpen: raise portNotOpenError
    122         if self._timeout is not None:
    123             timeout = time.time() + self._timeout
    124         else:
    125             timeout = None
    126         data = bytearray()
    127         while size > 0:
    128             self.buffer_lock.acquire()
    129             try:
    130                 block = to_bytes(self.loop_buffer[:size])
    131                 del self.loop_buffer[:size]
    132             finally:
    133                 self.buffer_lock.release()
    134             data += block
    135             size -= len(block)
    136             # check for timeout now, after data has been read.
    137             # useful for timeout = 0 (non blocking) read
    138             if timeout and time.time() > timeout:
    139                 break
    140         return bytes(data)
    141 
    142     def write(self, data):
    143         """Output the given string over the serial port. Can block if the
    144         connection is blocked. May raise SerialException if the connection is
    145         closed."""
    146         if not self._isOpen: raise portNotOpenError
    147         # ensure we're working with bytes
    148         data = to_bytes(data)
    149         # calculate aprox time that would be used to send the data
    150         time_used_to_send = 10.0*len(data) / self._baudrate
    151         # when a write timeout is configured check if we would be successful
    152         # (not sending anything, not even the part that would have time)
    153         if self._writeTimeout is not None and time_used_to_send > self._writeTimeout:
    154             time.sleep(self._writeTimeout) # must wait so that unit test succeeds
    155             raise writeTimeoutError
    156         self.buffer_lock.acquire()
    157         try:
    158             self.loop_buffer += data
    159         finally:
    160             self.buffer_lock.release()
    161         return len(data)
    162 
    163     def flushInput(self):
    164         """Clear input buffer, discarding all that is in the buffer."""
    165         if not self._isOpen: raise portNotOpenError
    166         if self.logger:
    167             self.logger.info('flushInput()')
    168         self.buffer_lock.acquire()
    169         try:
    170             del self.loop_buffer[:]
    171         finally:
    172             self.buffer_lock.release()
    173 
    174     def flushOutput(self):
    175         """Clear output buffer, aborting the current output and
    176         discarding all that is in the buffer."""
    177         if not self._isOpen: raise portNotOpenError
    178         if self.logger:
    179             self.logger.info('flushOutput()')
    180 
    181     def sendBreak(self, duration=0.25):
    182         """Send break condition. Timed, returns to idle state after given
    183         duration."""
    184         if not self._isOpen: raise portNotOpenError
    185 
    186     def setBreak(self, level=True):
    187         """Set break: Controls TXD. When active, to transmitting is
    188         possible."""
    189         if not self._isOpen: raise portNotOpenError
    190         if self.logger:
    191             self.logger.info('setBreak(%r)' % (level,))
    192 
    193     def setRTS(self, level=True):
    194         """Set terminal status line: Request To Send"""
    195         if not self._isOpen: raise portNotOpenError
    196         if self.logger:
    197             self.logger.info('setRTS(%r) -> state of CTS' % (level,))
    198         self.cts = level
    199 
    200     def setDTR(self, level=True):
    201         """Set terminal status line: Data Terminal Ready"""
    202         if not self._isOpen: raise portNotOpenError
    203         if self.logger:
    204             self.logger.info('setDTR(%r) -> state of DSR' % (level,))
    205         self.dsr = level
    206 
    207     def getCTS(self):
    208         """Read terminal status line: Clear To Send"""
    209         if not self._isOpen: raise portNotOpenError
    210         if self.logger:
    211             self.logger.info('getCTS() -> state of RTS (%r)' % (self.cts,))
    212         return self.cts
    213 
    214     def getDSR(self):
    215         """Read terminal status line: Data Set Ready"""
    216         if not self._isOpen: raise portNotOpenError
    217         if self.logger:
    218             self.logger.info('getDSR() -> state of DTR (%r)' % (self.dsr,))
    219         return self.dsr
    220 
    221     def getRI(self):
    222         """Read terminal status line: Ring Indicator"""
    223         if not self._isOpen: raise portNotOpenError
    224         if self.logger:
    225             self.logger.info('returning dummy for getRI()')
    226         return False
    227 
    228     def getCD(self):
    229         """Read terminal status line: Carrier Detect"""
    230         if not self._isOpen: raise portNotOpenError
    231         if self.logger:
    232             self.logger.info('returning dummy for getCD()')
    233         return True
    234 
    235     # - - - platform specific - - -
    236     # None so far
    237 
    238 
    239 # assemble Serial class with the platform specific implementation and the base
    240 # for file-like behavior. for Python 2.6 and newer, that provide the new I/O
    241 # library, derive from io.RawIOBase
    242 try:
    243     import io
    244 except ImportError:
    245     # classic version with our own file-like emulation
    246     class Serial(LoopbackSerial, FileLike):
    247         pass
    248 else:
    249     # io library present
    250     class Serial(LoopbackSerial, io.RawIOBase):
    251         pass
    252 
    253 
    254 # simple client test
    255 if __name__ == '__main__':
    256     import sys
    257     s = Serial('loop://')
    258     sys.stdout.write('%s\n' % s)
    259 
    260     sys.stdout.write("write...\n")
    261     s.write("hello\n")
    262     s.flush()
    263     sys.stdout.write("read: %s\n" % s.read(5))
    264 
    265     s.close()
    266