Home | History | Annotate | Download | only in tester_feedback
      1 # Copyright 2016 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Tester feedback request multiplexer."""
      6 
      7 from multiprocessing import reduction
      8 import Queue
      9 import collections
     10 import multiprocessing
     11 import os
     12 import sys
     13 
     14 import common
     15 from autotest_lib.client.common_lib.feedback import tester_feedback_client
     16 
     17 import input_handlers
     18 import request
     19 import sequenced_request
     20 
     21 
     22 ReqTuple = collections.namedtuple(
     23         'ReqTuple', ('obj', 'reduced_reply_pipe', 'query_num', 'atomic'))
     24 
     25 
     26 class FeedbackRequestMultiplexer(object):
     27     """A feedback request multiplexer."""
     28 
     29     class RequestProcessingTerminated(Exception):
     30         """User internally to signal processor termination."""
     31 
     32 
     33     def __init__(self):
     34         self._request_queue = multiprocessing.Queue()
     35         self._pending = []
     36         self._request_handling_process = None
     37         self._running = False
     38         self._atomic_seq = None
     39 
     40 
     41     def _dequeue_request(self, block=False):
     42         try:
     43             req_tuple = self._request_queue.get(block=block)
     44         except Queue.Empty:
     45             return False
     46 
     47         if req_tuple is None:
     48             raise self.RequestProcessingTerminated
     49         self._pending.append(req_tuple)
     50         return True
     51 
     52 
     53     def _atomic_seq_cont(self):
     54         """Returns index of next pending request in atomic sequence, if any."""
     55         for req_idx, req_tuple in enumerate(self._pending):
     56             if req_tuple.query_num == self._atomic_seq:
     57                 return req_idx
     58 
     59 
     60     def _handle_requests(self, stdin):
     61         """Processes feedback requests until termination is signaled.
     62 
     63         This method is run in a separate process and needs to override stdin in
     64         order for raw_input() to work.
     65         """
     66         sys.stdin = stdin
     67         try:
     68             while True:
     69                 req_idx = None
     70 
     71                 # Wait for a (suitable) request to become available.
     72                 while True:
     73                     if self._atomic_seq is None:
     74                         if self._pending:
     75                             break
     76                     else:
     77                         req_idx = self._atomic_seq_cont()
     78                         if req_idx is not None:
     79                             break
     80                     self._dequeue_request(block=True)
     81 
     82                 # If no request was pre-selected, prompt the user to choose one.
     83                 if req_idx is None:
     84                     raw_input('Pending feedback requests, hit Enter to '
     85                               'process... ')
     86 
     87                     # Pull all remaining queued requests.
     88                     while self._dequeue_request():
     89                         pass
     90 
     91                     # Select the request to process.
     92                     if len(self._pending) == 1:
     93                         print('Processing: %s' %
     94                               self._pending[0].obj.get_title())
     95                         req_idx = 0
     96                     else:
     97                         choose_req = sequenced_request.SequencedFeedbackRequest(
     98                                 None, None, None)
     99                         choose_req.append_question(
    100                                 'List of pending feedback requests:',
    101                                 input_handlers.MultipleChoiceInputHandler(
    102                                         [req_tuple.obj.get_title()
    103                                          for req_tuple in self._pending],
    104                                         default=1),
    105                                 prompt='Choose a request to process')
    106                         req_idx, _ = choose_req.execute()
    107 
    108                 # Pop and handle selected request, then close pipe.
    109                 req_tuple = self._pending.pop(req_idx)
    110                 if req_tuple.obj is not None:
    111                     try:
    112                         ret = req_tuple.obj.execute()
    113                     except request.FeedbackRequestError as e:
    114                         ret = (tester_feedback_client.QUERY_RET_ERROR, str(e))
    115                     reply_pipe = req_tuple.reduced_reply_pipe[0](
    116                             *req_tuple.reduced_reply_pipe[1])
    117                     reply_pipe.send(ret)
    118                     reply_pipe.close()
    119 
    120                 # Set the atomic sequence if so instructed.
    121                 self._atomic_seq = (req_tuple.query_num if req_tuple.atomic
    122                                     else None)
    123 
    124         except self.RequestProcessingTerminated:
    125             pass
    126 
    127 
    128     def start(self):
    129         """Starts the request multiplexer."""
    130         if self._running:
    131             return
    132 
    133         dup_stdin = os.fdopen(os.dup(sys.stdin.fileno()))
    134         self._request_handling_process = multiprocessing.Process(
    135                 target=self._handle_requests, args=(dup_stdin,))
    136         self._request_handling_process.start()
    137 
    138         self._running = True
    139 
    140 
    141     def stop(self):
    142         """Stops the request multiplexer."""
    143         if not self._running:
    144             return
    145 
    146         # Tell the request handler to quit.
    147         self._request_queue.put(None)
    148         self._request_handling_process.join()
    149 
    150         self._running = False
    151 
    152 
    153     def process_request(self, request, query_num, atomic):
    154         """Processes a feedback requests and returns its result.
    155 
    156         This call is used by queries for submitting individual requests. It is
    157         a blocking call that should be called from a separate execution thread.
    158 
    159         @param request: The feedback request to process.
    160         @param query_num: The unique query number.
    161         @param atomic: Whether subsequent request(s) are expected and should be
    162                        processed without interruption.
    163         """
    164         reply_pipe_send, reply_pipe_recv = multiprocessing.Pipe()
    165         reduced_reply_pipe_send = reduction.reduce_connection(reply_pipe_send)
    166         self._request_queue.put(ReqTuple(request, reduced_reply_pipe_send,
    167                                          query_num, atomic))
    168         return reply_pipe_recv.recv()
    169 
    170 
    171     def end_atomic_seq(self, query_num):
    172         """Ends the current atomic sequence.
    173 
    174         This enqueues a null request with the given query_num and atomicity set
    175         to False, causing the multiplexer to terminate the atomic sequence.
    176 
    177         @param query_num: The unique query number.
    178         """
    179         self._request_queue.put(ReqTuple(None, None, query_num, False))
    180