1 #! python 2 # Python Serial Port Extension for Win32, Linux, BSD, Jython 3 # serial driver for win32 4 # see __init__.py 5 # 6 # (C) 2001-2011 Chris Liechti <cliechti (at] gmx.net> 7 # this is distributed under a free software license, see license.txt 8 # 9 # Initial patch to use ctypes by Giovanni Bajo <rasky (at] develer.com> 10 11 import ctypes 12 from serial import win32 13 14 from serial.serialutil import * 15 16 17 def device(portnum): 18 """Turn a port number into a device name""" 19 return 'COM%d' % (portnum+1) # numbers are transformed to a string 20 21 22 class Win32Serial(SerialBase): 23 """Serial port implementation for Win32 based on ctypes.""" 24 25 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 26 9600, 19200, 38400, 57600, 115200) 27 28 def __init__(self, *args, **kwargs): 29 self.hComPort = None 30 self._overlappedRead = None 31 self._overlappedWrite = None 32 self._rtsToggle = False 33 34 self._rtsState = win32.RTS_CONTROL_ENABLE 35 self._dtrState = win32.DTR_CONTROL_ENABLE 36 37 38 SerialBase.__init__(self, *args, **kwargs) 39 40 def open(self): 41 """Open port with current settings. This may throw a SerialException 42 if the port cannot be opened.""" 43 if self._port is None: 44 raise SerialException("Port must be configured before it can be used.") 45 if self._isOpen: 46 raise SerialException("Port is already open.") 47 # the "\\.\COMx" format is required for devices other than COM1-COM8 48 # not all versions of windows seem to support this properly 49 # so that the first few ports are used with the DOS device name 50 port = self.portstr 51 try: 52 if port.upper().startswith('COM') and int(port[3:]) > 8: 53 port = '\\\\.\\' + port 54 except ValueError: 55 # for like COMnotanumber 56 pass 57 self.hComPort = win32.CreateFile(port, 58 win32.GENERIC_READ | win32.GENERIC_WRITE, 59 0, # exclusive access 60 None, # no security 61 win32.OPEN_EXISTING, 62 win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED, 63 0) 64 if self.hComPort == win32.INVALID_HANDLE_VALUE: 65 self.hComPort = None # 'cause __del__ is called anyway 66 raise SerialException("could not open port %r: %r" % (self.portstr, ctypes.WinError())) 67 68 try: 69 self._overlappedRead = win32.OVERLAPPED() 70 self._overlappedRead.hEvent = win32.CreateEvent(None, 1, 0, None) 71 self._overlappedWrite = win32.OVERLAPPED() 72 #~ self._overlappedWrite.hEvent = win32.CreateEvent(None, 1, 0, None) 73 self._overlappedWrite.hEvent = win32.CreateEvent(None, 0, 0, None) 74 75 # Setup a 4k buffer 76 win32.SetupComm(self.hComPort, 4096, 4096) 77 78 # Save original timeout values: 79 self._orgTimeouts = win32.COMMTIMEOUTS() 80 win32.GetCommTimeouts(self.hComPort, ctypes.byref(self._orgTimeouts)) 81 82 self._reconfigurePort() 83 84 # Clear buffers: 85 # Remove anything that was there 86 win32.PurgeComm(self.hComPort, 87 win32.PURGE_TXCLEAR | win32.PURGE_TXABORT | 88 win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) 89 except: 90 try: 91 self._close() 92 except: 93 # ignore any exception when closing the port 94 # also to keep original exception that happened when setting up 95 pass 96 self.hComPort = None 97 raise 98 else: 99 self._isOpen = True 100 101 102 def _reconfigurePort(self): 103 """Set communication parameters on opened port.""" 104 if not self.hComPort: 105 raise SerialException("Can only operate on a valid port handle") 106 107 # Set Windows timeout values 108 # timeouts is a tuple with the following items: 109 # (ReadIntervalTimeout,ReadTotalTimeoutMultiplier, 110 # ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier, 111 # WriteTotalTimeoutConstant) 112 if self._timeout is None: 113 timeouts = (0, 0, 0, 0, 0) 114 elif self._timeout == 0: 115 timeouts = (win32.MAXDWORD, 0, 0, 0, 0) 116 else: 117 timeouts = (0, 0, int(self._timeout*1000), 0, 0) 118 if self._timeout != 0 and self._interCharTimeout is not None: 119 timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:] 120 121 if self._writeTimeout is None: 122 pass 123 elif self._writeTimeout == 0: 124 timeouts = timeouts[:-2] + (0, win32.MAXDWORD) 125 else: 126 timeouts = timeouts[:-2] + (0, int(self._writeTimeout*1000)) 127 win32.SetCommTimeouts(self.hComPort, ctypes.byref(win32.COMMTIMEOUTS(*timeouts))) 128 129 win32.SetCommMask(self.hComPort, win32.EV_ERR) 130 131 # Setup the connection info. 132 # Get state and modify it: 133 comDCB = win32.DCB() 134 win32.GetCommState(self.hComPort, ctypes.byref(comDCB)) 135 comDCB.BaudRate = self._baudrate 136 137 if self._bytesize == FIVEBITS: 138 comDCB.ByteSize = 5 139 elif self._bytesize == SIXBITS: 140 comDCB.ByteSize = 6 141 elif self._bytesize == SEVENBITS: 142 comDCB.ByteSize = 7 143 elif self._bytesize == EIGHTBITS: 144 comDCB.ByteSize = 8 145 else: 146 raise ValueError("Unsupported number of data bits: %r" % self._bytesize) 147 148 if self._parity == PARITY_NONE: 149 comDCB.Parity = win32.NOPARITY 150 comDCB.fParity = 0 # Disable Parity Check 151 elif self._parity == PARITY_EVEN: 152 comDCB.Parity = win32.EVENPARITY 153 comDCB.fParity = 1 # Enable Parity Check 154 elif self._parity == PARITY_ODD: 155 comDCB.Parity = win32.ODDPARITY 156 comDCB.fParity = 1 # Enable Parity Check 157 elif self._parity == PARITY_MARK: 158 comDCB.Parity = win32.MARKPARITY 159 comDCB.fParity = 1 # Enable Parity Check 160 elif self._parity == PARITY_SPACE: 161 comDCB.Parity = win32.SPACEPARITY 162 comDCB.fParity = 1 # Enable Parity Check 163 else: 164 raise ValueError("Unsupported parity mode: %r" % self._parity) 165 166 if self._stopbits == STOPBITS_ONE: 167 comDCB.StopBits = win32.ONESTOPBIT 168 elif self._stopbits == STOPBITS_ONE_POINT_FIVE: 169 comDCB.StopBits = win32.ONE5STOPBITS 170 elif self._stopbits == STOPBITS_TWO: 171 comDCB.StopBits = win32.TWOSTOPBITS 172 else: 173 raise ValueError("Unsupported number of stop bits: %r" % self._stopbits) 174 175 comDCB.fBinary = 1 # Enable Binary Transmission 176 # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE) 177 if self._rtscts: 178 comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE 179 elif self._rtsToggle: 180 comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE 181 else: 182 comDCB.fRtsControl = self._rtsState 183 if self._dsrdtr: 184 comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE 185 else: 186 comDCB.fDtrControl = self._dtrState 187 188 if self._rtsToggle: 189 comDCB.fOutxCtsFlow = 0 190 else: 191 comDCB.fOutxCtsFlow = self._rtscts 192 comDCB.fOutxDsrFlow = self._dsrdtr 193 comDCB.fOutX = self._xonxoff 194 comDCB.fInX = self._xonxoff 195 comDCB.fNull = 0 196 comDCB.fErrorChar = 0 197 comDCB.fAbortOnError = 0 198 comDCB.XonChar = XON 199 comDCB.XoffChar = XOFF 200 201 if not win32.SetCommState(self.hComPort, ctypes.byref(comDCB)): 202 raise ValueError("Cannot configure port, some setting was wrong. Original message: %r" % ctypes.WinError()) 203 204 #~ def __del__(self): 205 #~ self.close() 206 207 208 def _close(self): 209 """internal close port helper""" 210 if self.hComPort: 211 # Restore original timeout values: 212 win32.SetCommTimeouts(self.hComPort, self._orgTimeouts) 213 # Close COM-Port: 214 win32.CloseHandle(self.hComPort) 215 if self._overlappedRead is not None: 216 win32.CloseHandle(self._overlappedRead.hEvent) 217 self._overlappedRead = None 218 if self._overlappedWrite is not None: 219 win32.CloseHandle(self._overlappedWrite.hEvent) 220 self._overlappedWrite = None 221 self.hComPort = None 222 223 def close(self): 224 """Close port""" 225 if self._isOpen: 226 self._close() 227 self._isOpen = False 228 229 def makeDeviceName(self, port): 230 return device(port) 231 232 # - - - - - - - - - - - - - - - - - - - - - - - - 233 234 def inWaiting(self): 235 """Return the number of characters currently in the input buffer.""" 236 flags = win32.DWORD() 237 comstat = win32.COMSTAT() 238 if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)): 239 raise SerialException('call to ClearCommError failed') 240 return comstat.cbInQue 241 242 def read(self, size=1): 243 """Read size bytes from the serial port. If a timeout is set it may 244 return less characters as requested. With no timeout it will block 245 until the requested number of bytes is read.""" 246 if not self.hComPort: raise portNotOpenError 247 if size > 0: 248 win32.ResetEvent(self._overlappedRead.hEvent) 249 flags = win32.DWORD() 250 comstat = win32.COMSTAT() 251 if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)): 252 raise SerialException('call to ClearCommError failed') 253 if self.timeout == 0: 254 n = min(comstat.cbInQue, size) 255 if n > 0: 256 buf = ctypes.create_string_buffer(n) 257 rc = win32.DWORD() 258 err = win32.ReadFile(self.hComPort, buf, n, ctypes.byref(rc), ctypes.byref(self._overlappedRead)) 259 if not err and win32.GetLastError() != win32.ERROR_IO_PENDING: 260 raise SerialException("ReadFile failed (%r)" % ctypes.WinError()) 261 err = win32.WaitForSingleObject(self._overlappedRead.hEvent, win32.INFINITE) 262 read = buf.raw[:rc.value] 263 else: 264 read = bytes() 265 else: 266 buf = ctypes.create_string_buffer(size) 267 rc = win32.DWORD() 268 err = win32.ReadFile(self.hComPort, buf, size, ctypes.byref(rc), ctypes.byref(self._overlappedRead)) 269 if not err and win32.GetLastError() != win32.ERROR_IO_PENDING: 270 raise SerialException("ReadFile failed (%r)" % ctypes.WinError()) 271 err = win32.GetOverlappedResult(self.hComPort, ctypes.byref(self._overlappedRead), ctypes.byref(rc), True) 272 read = buf.raw[:rc.value] 273 else: 274 read = bytes() 275 return bytes(read) 276 277 def write(self, data): 278 """Output the given string over the serial port.""" 279 if not self.hComPort: raise portNotOpenError 280 #~ if not isinstance(data, (bytes, bytearray)): 281 #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) 282 # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview 283 data = to_bytes(data) 284 if data: 285 #~ win32event.ResetEvent(self._overlappedWrite.hEvent) 286 n = win32.DWORD() 287 err = win32.WriteFile(self.hComPort, data, len(data), ctypes.byref(n), self._overlappedWrite) 288 if not err and win32.GetLastError() != win32.ERROR_IO_PENDING: 289 raise SerialException("WriteFile failed (%r)" % ctypes.WinError()) 290 if self._writeTimeout != 0: # if blocking (None) or w/ write timeout (>0) 291 # Wait for the write to complete. 292 #~ win32.WaitForSingleObject(self._overlappedWrite.hEvent, win32.INFINITE) 293 err = win32.GetOverlappedResult(self.hComPort, self._overlappedWrite, ctypes.byref(n), True) 294 if n.value != len(data): 295 raise writeTimeoutError 296 return n.value 297 else: 298 return 0 299 300 def flush(self): 301 """Flush of file like objects. In this case, wait until all data 302 is written.""" 303 while self.outWaiting(): 304 time.sleep(0.05) 305 # XXX could also use WaitCommEvent with mask EV_TXEMPTY, but it would 306 # require overlapped IO and its also only possible to set a single mask 307 # on the port--- 308 309 def flushInput(self): 310 """Clear input buffer, discarding all that is in the buffer.""" 311 if not self.hComPort: raise portNotOpenError 312 win32.PurgeComm(self.hComPort, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) 313 314 def flushOutput(self): 315 """Clear output buffer, aborting the current output and 316 discarding all that is in the buffer.""" 317 if not self.hComPort: raise portNotOpenError 318 win32.PurgeComm(self.hComPort, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT) 319 320 def sendBreak(self, duration=0.25): 321 """Send break condition. Timed, returns to idle state after given duration.""" 322 if not self.hComPort: raise portNotOpenError 323 import time 324 win32.SetCommBreak(self.hComPort) 325 time.sleep(duration) 326 win32.ClearCommBreak(self.hComPort) 327 328 def setBreak(self, level=1): 329 """Set break: Controls TXD. When active, to transmitting is possible.""" 330 if not self.hComPort: raise portNotOpenError 331 if level: 332 win32.SetCommBreak(self.hComPort) 333 else: 334 win32.ClearCommBreak(self.hComPort) 335 336 def setRTS(self, level=1): 337 """Set terminal status line: Request To Send""" 338 # remember level for reconfigure 339 if level: 340 self._rtsState = win32.RTS_CONTROL_ENABLE 341 else: 342 self._rtsState = win32.RTS_CONTROL_DISABLE 343 # also apply now if port is open 344 if self.hComPort: 345 if level: 346 win32.EscapeCommFunction(self.hComPort, win32.SETRTS) 347 else: 348 win32.EscapeCommFunction(self.hComPort, win32.CLRRTS) 349 350 def setDTR(self, level=1): 351 """Set terminal status line: Data Terminal Ready""" 352 # remember level for reconfigure 353 if level: 354 self._dtrState = win32.DTR_CONTROL_ENABLE 355 else: 356 self._dtrState = win32.DTR_CONTROL_DISABLE 357 # also apply now if port is open 358 if self.hComPort: 359 if level: 360 win32.EscapeCommFunction(self.hComPort, win32.SETDTR) 361 else: 362 win32.EscapeCommFunction(self.hComPort, win32.CLRDTR) 363 364 def _GetCommModemStatus(self): 365 stat = win32.DWORD() 366 win32.GetCommModemStatus(self.hComPort, ctypes.byref(stat)) 367 return stat.value 368 369 def getCTS(self): 370 """Read terminal status line: Clear To Send""" 371 if not self.hComPort: raise portNotOpenError 372 return win32.MS_CTS_ON & self._GetCommModemStatus() != 0 373 374 def getDSR(self): 375 """Read terminal status line: Data Set Ready""" 376 if not self.hComPort: raise portNotOpenError 377 return win32.MS_DSR_ON & self._GetCommModemStatus() != 0 378 379 def getRI(self): 380 """Read terminal status line: Ring Indicator""" 381 if not self.hComPort: raise portNotOpenError 382 return win32.MS_RING_ON & self._GetCommModemStatus() != 0 383 384 def getCD(self): 385 """Read terminal status line: Carrier Detect""" 386 if not self.hComPort: raise portNotOpenError 387 return win32.MS_RLSD_ON & self._GetCommModemStatus() != 0 388 389 # - - platform specific - - - - 390 391 def setBufferSize(self, rx_size=4096, tx_size=None): 392 """\ 393 Recommend a buffer size to the driver (device driver can ignore this 394 vlaue). Must be called before the port is opended. 395 """ 396 if tx_size is None: tx_size = rx_size 397 win32.SetupComm(self.hComPort, rx_size, tx_size) 398 399 def setXON(self, level=True): 400 """\ 401 Manually control flow - when software flow control is enabled. 402 This will send XON (true) and XOFF (false) to the other device. 403 WARNING: this function is not portable to different platforms! 404 """ 405 if not self.hComPort: raise portNotOpenError 406 if level: 407 win32.EscapeCommFunction(self.hComPort, win32.SETXON) 408 else: 409 win32.EscapeCommFunction(self.hComPort, win32.SETXOFF) 410 411 def outWaiting(self): 412 """return how many characters the in the outgoing buffer""" 413 flags = win32.DWORD() 414 comstat = win32.COMSTAT() 415 if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)): 416 raise SerialException('call to ClearCommError failed') 417 return comstat.cbOutQue 418 419 # functions useful for RS-485 adapters 420 def setRtsToggle(self, rtsToggle): 421 """Change RTS toggle control setting.""" 422 self._rtsToggle = rtsToggle 423 if self._isOpen: self._reconfigurePort() 424 425 def getRtsToggle(self): 426 """Get the current RTS toggle control setting.""" 427 return self._rtsToggle 428 429 rtsToggle = property(getRtsToggle, setRtsToggle, doc="RTS toggle control setting") 430 431 432 # assemble Serial class with the platform specific implementation and the base 433 # for file-like behavior. for Python 2.6 and newer, that provide the new I/O 434 # library, derive from io.RawIOBase 435 try: 436 import io 437 except ImportError: 438 # classic version with our own file-like emulation 439 class Serial(Win32Serial, FileLike): 440 pass 441 else: 442 # io library present 443 class Serial(Win32Serial, io.RawIOBase): 444 pass 445 446 447 # Nur Testfunktion!! 448 if __name__ == '__main__': 449 s = Serial(0) 450 sys.stdout.write("%s\n" % s) 451 452 s = Serial() 453 sys.stdout.write("%s\n" % s) 454 455 s.baudrate = 19200 456 s.databits = 7 457 s.close() 458 s.port = 0 459 s.open() 460 sys.stdout.write("%s\n" % s) 461 462