1 __all__ = ['create_subprocess_exec', 'create_subprocess_shell'] 2 3 import subprocess 4 5 from . import events 6 from . import protocols 7 from . import streams 8 from . import tasks 9 from .coroutines import coroutine 10 from .log import logger 11 12 13 PIPE = subprocess.PIPE 14 STDOUT = subprocess.STDOUT 15 DEVNULL = subprocess.DEVNULL 16 17 18 class SubprocessStreamProtocol(streams.FlowControlMixin, 19 protocols.SubprocessProtocol): 20 """Like StreamReaderProtocol, but for a subprocess.""" 21 22 def __init__(self, limit, loop): 23 super().__init__(loop=loop) 24 self._limit = limit 25 self.stdin = self.stdout = self.stderr = None 26 self._transport = None 27 self._process_exited = False 28 self._pipe_fds = [] 29 30 def __repr__(self): 31 info = [self.__class__.__name__] 32 if self.stdin is not None: 33 info.append('stdin=%r' % self.stdin) 34 if self.stdout is not None: 35 info.append('stdout=%r' % self.stdout) 36 if self.stderr is not None: 37 info.append('stderr=%r' % self.stderr) 38 return '<%s>' % ' '.join(info) 39 40 def connection_made(self, transport): 41 self._transport = transport 42 43 stdout_transport = transport.get_pipe_transport(1) 44 if stdout_transport is not None: 45 self.stdout = streams.StreamReader(limit=self._limit, 46 loop=self._loop) 47 self.stdout.set_transport(stdout_transport) 48 self._pipe_fds.append(1) 49 50 stderr_transport = transport.get_pipe_transport(2) 51 if stderr_transport is not None: 52 self.stderr = streams.StreamReader(limit=self._limit, 53 loop=self._loop) 54 self.stderr.set_transport(stderr_transport) 55 self._pipe_fds.append(2) 56 57 stdin_transport = transport.get_pipe_transport(0) 58 if stdin_transport is not None: 59 self.stdin = streams.StreamWriter(stdin_transport, 60 protocol=self, 61 reader=None, 62 loop=self._loop) 63 64 def pipe_data_received(self, fd, data): 65 if fd == 1: 66 reader = self.stdout 67 elif fd == 2: 68 reader = self.stderr 69 else: 70 reader = None 71 if reader is not None: 72 reader.feed_data(data) 73 74 def pipe_connection_lost(self, fd, exc): 75 if fd == 0: 76 pipe = self.stdin 77 if pipe is not None: 78 pipe.close() 79 self.connection_lost(exc) 80 return 81 if fd == 1: 82 reader = self.stdout 83 elif fd == 2: 84 reader = self.stderr 85 else: 86 reader = None 87 if reader != None: 88 if exc is None: 89 reader.feed_eof() 90 else: 91 reader.set_exception(exc) 92 93 if fd in self._pipe_fds: 94 self._pipe_fds.remove(fd) 95 self._maybe_close_transport() 96 97 def process_exited(self): 98 self._process_exited = True 99 self._maybe_close_transport() 100 101 def _maybe_close_transport(self): 102 if len(self._pipe_fds) == 0 and self._process_exited: 103 self._transport.close() 104 self._transport = None 105 106 107 class Process: 108 def __init__(self, transport, protocol, loop): 109 self._transport = transport 110 self._protocol = protocol 111 self._loop = loop 112 self.stdin = protocol.stdin 113 self.stdout = protocol.stdout 114 self.stderr = protocol.stderr 115 self.pid = transport.get_pid() 116 117 def __repr__(self): 118 return '<%s %s>' % (self.__class__.__name__, self.pid) 119 120 @property 121 def returncode(self): 122 return self._transport.get_returncode() 123 124 @coroutine 125 def wait(self): 126 """Wait until the process exit and return the process return code. 127 128 This method is a coroutine.""" 129 return (yield from self._transport._wait()) 130 131 def send_signal(self, signal): 132 self._transport.send_signal(signal) 133 134 def terminate(self): 135 self._transport.terminate() 136 137 def kill(self): 138 self._transport.kill() 139 140 @coroutine 141 def _feed_stdin(self, input): 142 debug = self._loop.get_debug() 143 self.stdin.write(input) 144 if debug: 145 logger.debug('%r communicate: feed stdin (%s bytes)', 146 self, len(input)) 147 try: 148 yield from self.stdin.drain() 149 except (BrokenPipeError, ConnectionResetError) as exc: 150 # communicate() ignores BrokenPipeError and ConnectionResetError 151 if debug: 152 logger.debug('%r communicate: stdin got %r', self, exc) 153 154 if debug: 155 logger.debug('%r communicate: close stdin', self) 156 self.stdin.close() 157 158 @coroutine 159 def _noop(self): 160 return None 161 162 @coroutine 163 def _read_stream(self, fd): 164 transport = self._transport.get_pipe_transport(fd) 165 if fd == 2: 166 stream = self.stderr 167 else: 168 assert fd == 1 169 stream = self.stdout 170 if self._loop.get_debug(): 171 name = 'stdout' if fd == 1 else 'stderr' 172 logger.debug('%r communicate: read %s', self, name) 173 output = yield from stream.read() 174 if self._loop.get_debug(): 175 name = 'stdout' if fd == 1 else 'stderr' 176 logger.debug('%r communicate: close %s', self, name) 177 transport.close() 178 return output 179 180 @coroutine 181 def communicate(self, input=None): 182 if input is not None: 183 stdin = self._feed_stdin(input) 184 else: 185 stdin = self._noop() 186 if self.stdout is not None: 187 stdout = self._read_stream(1) 188 else: 189 stdout = self._noop() 190 if self.stderr is not None: 191 stderr = self._read_stream(2) 192 else: 193 stderr = self._noop() 194 stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr, 195 loop=self._loop) 196 yield from self.wait() 197 return (stdout, stderr) 198 199 200 @coroutine 201 def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, 202 loop=None, limit=streams._DEFAULT_LIMIT, **kwds): 203 if loop is None: 204 loop = events.get_event_loop() 205 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, 206 loop=loop) 207 transport, protocol = yield from loop.subprocess_shell( 208 protocol_factory, 209 cmd, stdin=stdin, stdout=stdout, 210 stderr=stderr, **kwds) 211 return Process(transport, protocol, loop) 212 213 @coroutine 214 def create_subprocess_exec(program, *args, stdin=None, stdout=None, 215 stderr=None, loop=None, 216 limit=streams._DEFAULT_LIMIT, **kwds): 217 if loop is None: 218 loop = events.get_event_loop() 219 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, 220 loop=loop) 221 transport, protocol = yield from loop.subprocess_exec( 222 protocol_factory, 223 program, *args, 224 stdin=stdin, stdout=stdout, 225 stderr=stderr, **kwds) 226 return Process(transport, protocol, loop) 227