1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 """Syncronized Standard IO Linebuffer implemented with cStringIO.""" 5 6 import cStringIO 7 import threading 8 import Queue 9 10 11 class StdioBuffer(object): 12 def __init__(self, shard): 13 self.queue = Queue.Queue() 14 self.completed = 0 15 self.shard = shard 16 17 def _pipe_handler(self, system_pipe, program_pipe): 18 """Helper method for collecting stdio output. Output is collected until 19 a newline is seen, at which point an event is triggered and the line is 20 pushed to a buffer as a (stdio, line) tuple.""" 21 buf = cStringIO.StringIO() 22 pipe_running = True 23 while pipe_running: 24 char = program_pipe.read(1) 25 if not char and self.shard.poll() is not None: 26 pipe_running = False 27 buf.write(char) 28 if char == '\n' or not pipe_running: 29 line = buf.getvalue() 30 if line: 31 self.queue.put((system_pipe, line)) 32 if not pipe_running: 33 self.queue.put((system_pipe, None)) 34 buf.close() 35 buf = cStringIO.StringIO() 36 37 def handle_pipe(self, system_pipe, program_pipe): 38 t = threading.Thread(target=self._pipe_handler, args=[system_pipe, 39 program_pipe]) 40 t.start() 41 return t 42 43 def readline(self): 44 """Emits a tuple of (sys.stderr, line), (sys.stdout, line), or (None, None) 45 if the process has finished. This is a blocking call.""" 46 while True: 47 (pipe, line) = self.queue.get(True) 48 if line: 49 return (pipe, line) 50 self.completed += 1 51 if self.completed >= 2: 52 return (None, None) 53