1 # Copyright 2016 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 """Tester feedback request multiplexer.""" 6 7 from multiprocessing import reduction 8 import Queue 9 import collections 10 import multiprocessing 11 import os 12 import sys 13 14 import common 15 from autotest_lib.client.common_lib.feedback import tester_feedback_client 16 17 import input_handlers 18 import request 19 import sequenced_request 20 21 22 ReqTuple = collections.namedtuple( 23 'ReqTuple', ('obj', 'reduced_reply_pipe', 'query_num', 'atomic')) 24 25 26 class FeedbackRequestMultiplexer(object): 27 """A feedback request multiplexer.""" 28 29 class RequestProcessingTerminated(Exception): 30 """User internally to signal processor termination.""" 31 32 33 def __init__(self): 34 self._request_queue = multiprocessing.Queue() 35 self._pending = [] 36 self._request_handling_process = None 37 self._running = False 38 self._atomic_seq = None 39 40 41 def _dequeue_request(self, block=False): 42 try: 43 req_tuple = self._request_queue.get(block=block) 44 except Queue.Empty: 45 return False 46 47 if req_tuple is None: 48 raise self.RequestProcessingTerminated 49 self._pending.append(req_tuple) 50 return True 51 52 53 def _atomic_seq_cont(self): 54 """Returns index of next pending request in atomic sequence, if any.""" 55 for req_idx, req_tuple in enumerate(self._pending): 56 if req_tuple.query_num == self._atomic_seq: 57 return req_idx 58 59 60 def _handle_requests(self, stdin): 61 """Processes feedback requests until termination is signaled. 62 63 This method is run in a separate process and needs to override stdin in 64 order for raw_input() to work. 65 """ 66 sys.stdin = stdin 67 try: 68 while True: 69 req_idx = None 70 71 # Wait for a (suitable) request to become available. 72 while True: 73 if self._atomic_seq is None: 74 if self._pending: 75 break 76 else: 77 req_idx = self._atomic_seq_cont() 78 if req_idx is not None: 79 break 80 self._dequeue_request(block=True) 81 82 # If no request was pre-selected, prompt the user to choose one. 83 if req_idx is None: 84 raw_input('Pending feedback requests, hit Enter to ' 85 'process... ') 86 87 # Pull all remaining queued requests. 88 while self._dequeue_request(): 89 pass 90 91 # Select the request to process. 92 if len(self._pending) == 1: 93 print('Processing: %s' % 94 self._pending[0].obj.get_title()) 95 req_idx = 0 96 else: 97 choose_req = sequenced_request.SequencedFeedbackRequest( 98 None, None, None) 99 choose_req.append_question( 100 'List of pending feedback requests:', 101 input_handlers.MultipleChoiceInputHandler( 102 [req_tuple.obj.get_title() 103 for req_tuple in self._pending], 104 default=1), 105 prompt='Choose a request to process') 106 req_idx, _ = choose_req.execute() 107 108 # Pop and handle selected request, then close pipe. 109 req_tuple = self._pending.pop(req_idx) 110 if req_tuple.obj is not None: 111 try: 112 ret = req_tuple.obj.execute() 113 except request.FeedbackRequestError as e: 114 ret = (tester_feedback_client.QUERY_RET_ERROR, str(e)) 115 reply_pipe = req_tuple.reduced_reply_pipe[0]( 116 *req_tuple.reduced_reply_pipe[1]) 117 reply_pipe.send(ret) 118 reply_pipe.close() 119 120 # Set the atomic sequence if so instructed. 121 self._atomic_seq = (req_tuple.query_num if req_tuple.atomic 122 else None) 123 124 except self.RequestProcessingTerminated: 125 pass 126 127 128 def start(self): 129 """Starts the request multiplexer.""" 130 if self._running: 131 return 132 133 dup_stdin = os.fdopen(os.dup(sys.stdin.fileno())) 134 self._request_handling_process = multiprocessing.Process( 135 target=self._handle_requests, args=(dup_stdin,)) 136 self._request_handling_process.start() 137 138 self._running = True 139 140 141 def stop(self): 142 """Stops the request multiplexer.""" 143 if not self._running: 144 return 145 146 # Tell the request handler to quit. 147 self._request_queue.put(None) 148 self._request_handling_process.join() 149 150 self._running = False 151 152 153 def process_request(self, request, query_num, atomic): 154 """Processes a feedback requests and returns its result. 155 156 This call is used by queries for submitting individual requests. It is 157 a blocking call that should be called from a separate execution thread. 158 159 @param request: The feedback request to process. 160 @param query_num: The unique query number. 161 @param atomic: Whether subsequent request(s) are expected and should be 162 processed without interruption. 163 """ 164 reply_pipe_send, reply_pipe_recv = multiprocessing.Pipe() 165 reduced_reply_pipe_send = reduction.reduce_connection(reply_pipe_send) 166 self._request_queue.put(ReqTuple(request, reduced_reply_pipe_send, 167 query_num, atomic)) 168 return reply_pipe_recv.recv() 169 170 171 def end_atomic_seq(self, query_num): 172 """Ends the current atomic sequence. 173 174 This enqueues a null request with the given query_num and atomicity set 175 to False, causing the multiplexer to terminate the atomic sequence. 176 177 @param query_num: The unique query number. 178 """ 179 self._request_queue.put(ReqTuple(None, None, query_num, False)) 180