Home | History | Annotate | Download | only in tests
      1 # Copyright 2015 gRPC authors.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 
     15 from __future__ import absolute_import
     16 
     17 import collections
     18 import multiprocessing
     19 import os
     20 import select
     21 import signal
     22 import sys
     23 import tempfile
     24 import threading
     25 import time
     26 import unittest
     27 import uuid
     28 
     29 import six
     30 from six import moves
     31 
     32 from tests import _loader
     33 from tests import _result
     34 
     35 
     36 class CaptureFile(object):
     37     """A context-managed file to redirect output to a byte array.
     38 
     39   Use by invoking `start` (`__enter__`) and at some point invoking `stop`
     40   (`__exit__`). At any point after the initial call to `start` call `output` to
     41   get the current redirected output. Note that we don't currently use file
     42   locking, so calling `output` between calls to `start` and `stop` may muddle
     43   the result (you should only be doing this during a Python-handled interrupt as
     44   a last ditch effort to provide output to the user).
     45 
     46   Attributes:
     47     _redirected_fd (int): File descriptor of file to redirect writes from.
     48     _saved_fd (int): A copy of the original value of the redirected file
     49       descriptor.
     50     _into_file (TemporaryFile or None): File to which writes are redirected.
     51       Only non-None when self is started.
     52   """
     53 
     54     def __init__(self, fd):
     55         self._redirected_fd = fd
     56         self._saved_fd = os.dup(self._redirected_fd)
     57         self._into_file = None
     58 
     59     def output(self):
     60         """Get all output from the redirected-to file if it exists."""
     61         if self._into_file:
     62             self._into_file.seek(0)
     63             return bytes(self._into_file.read())
     64         else:
     65             return bytes()
     66 
     67     def start(self):
     68         """Start redirection of writes to the file descriptor."""
     69         self._into_file = tempfile.TemporaryFile()
     70         os.dup2(self._into_file.fileno(), self._redirected_fd)
     71 
     72     def stop(self):
     73         """Stop redirection of writes to the file descriptor."""
     74         # n.b. this dup2 call auto-closes self._redirected_fd
     75         os.dup2(self._saved_fd, self._redirected_fd)
     76 
     77     def write_bypass(self, value):
     78         """Bypass the redirection and write directly to the original file.
     79 
     80     Arguments:
     81       value (str): What to write to the original file.
     82     """
     83         if six.PY3 and not isinstance(value, six.binary_type):
     84             value = bytes(value, 'ascii')
     85         if self._saved_fd is None:
     86             os.write(self._redirect_fd, value)
     87         else:
     88             os.write(self._saved_fd, value)
     89 
     90     def __enter__(self):
     91         self.start()
     92         return self
     93 
     94     def __exit__(self, type, value, traceback):
     95         self.stop()
     96 
     97     def close(self):
     98         """Close any resources used by self not closed by stop()."""
     99         os.close(self._saved_fd)
    100 
    101 
    102 class AugmentedCase(collections.namedtuple('AugmentedCase', ['case', 'id'])):
    103     """A test case with a guaranteed unique externally specified identifier.
    104 
    105   Attributes:
    106     case (unittest.TestCase): TestCase we're decorating with an additional
    107       identifier.
    108     id (object): Any identifier that may be considered 'unique' for testing
    109       purposes.
    110   """
    111 
    112     def __new__(cls, case, id=None):
    113         if id is None:
    114             id = uuid.uuid4()
    115         return super(cls, AugmentedCase).__new__(cls, case, id)
    116 
    117 
    118 class Runner(object):
    119 
    120     def __init__(self):
    121         self._skipped_tests = []
    122 
    123     def skip_tests(self, tests):
    124         self._skipped_tests = tests
    125 
    126     def run(self, suite):
    127         """See setuptools' test_runner setup argument for information."""
    128         # only run test cases with id starting with given prefix
    129         testcase_filter = os.getenv('GRPC_PYTHON_TESTRUNNER_FILTER')
    130         filtered_cases = []
    131         for case in _loader.iterate_suite_cases(suite):
    132             if not testcase_filter or case.id().startswith(testcase_filter):
    133                 filtered_cases.append(case)
    134 
    135         # Ensure that every test case has no collision with any other test case in
    136         # the augmented results.
    137         augmented_cases = [
    138             AugmentedCase(case, uuid.uuid4()) for case in filtered_cases
    139         ]
    140         case_id_by_case = dict((augmented_case.case, augmented_case.id)
    141                                for augmented_case in augmented_cases)
    142         result_out = moves.cStringIO()
    143         result = _result.TerminalResult(
    144             result_out, id_map=lambda case: case_id_by_case[case])
    145         stdout_pipe = CaptureFile(sys.stdout.fileno())
    146         stderr_pipe = CaptureFile(sys.stderr.fileno())
    147         kill_flag = [False]
    148 
    149         def sigint_handler(signal_number, frame):
    150             if signal_number == signal.SIGINT:
    151                 kill_flag[0] = True  # Python 2.7 not having 'local'... :-(
    152             signal.signal(signal_number, signal.SIG_DFL)
    153 
    154         def fault_handler(signal_number, frame):
    155             stdout_pipe.write_bypass(
    156                 'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n'.format(
    157                     signal_number, stdout_pipe.output(), stderr_pipe.output()))
    158             os._exit(1)
    159 
    160         def check_kill_self():
    161             if kill_flag[0]:
    162                 stdout_pipe.write_bypass('Stopping tests short...')
    163                 result.stopTestRun()
    164                 stdout_pipe.write_bypass(result_out.getvalue())
    165                 stdout_pipe.write_bypass('\ninterrupted stdout:\n{}\n'.format(
    166                     stdout_pipe.output().decode()))
    167                 stderr_pipe.write_bypass('\ninterrupted stderr:\n{}\n'.format(
    168                     stderr_pipe.output().decode()))
    169                 os._exit(1)
    170 
    171         def try_set_handler(name, handler):
    172             try:
    173                 signal.signal(getattr(signal, name), handler)
    174             except AttributeError:
    175                 pass
    176 
    177         try_set_handler('SIGINT', sigint_handler)
    178         try_set_handler('SIGSEGV', fault_handler)
    179         try_set_handler('SIGBUS', fault_handler)
    180         try_set_handler('SIGABRT', fault_handler)
    181         try_set_handler('SIGFPE', fault_handler)
    182         try_set_handler('SIGILL', fault_handler)
    183         # Sometimes output will lag after a test has successfully finished; we
    184         # ignore such writes to our pipes.
    185         try_set_handler('SIGPIPE', signal.SIG_IGN)
    186 
    187         # Run the tests
    188         result.startTestRun()
    189         for augmented_case in augmented_cases:
    190             for skipped_test in self._skipped_tests:
    191                 if skipped_test in augmented_case.case.id():
    192                     break
    193             else:
    194                 sys.stdout.write('Running       {}\n'.format(
    195                     augmented_case.case.id()))
    196                 sys.stdout.flush()
    197                 case_thread = threading.Thread(
    198                     target=augmented_case.case.run, args=(result,))
    199                 try:
    200                     with stdout_pipe, stderr_pipe:
    201                         case_thread.start()
    202                         while case_thread.is_alive():
    203                             check_kill_self()
    204                             time.sleep(0)
    205                         case_thread.join()
    206                 except:
    207                     # re-raise the exception after forcing the with-block to end
    208                     raise
    209                 result.set_output(augmented_case.case, stdout_pipe.output(),
    210                                   stderr_pipe.output())
    211                 sys.stdout.write(result_out.getvalue())
    212                 sys.stdout.flush()
    213                 result_out.truncate(0)
    214                 check_kill_self()
    215         result.stopTestRun()
    216         stdout_pipe.close()
    217         stderr_pipe.close()
    218 
    219         # Report results
    220         sys.stdout.write(result_out.getvalue())
    221         sys.stdout.flush()
    222         signal.signal(signal.SIGINT, signal.SIG_DFL)
    223         with open('report.xml', 'wb') as report_xml_file:
    224             _result.jenkins_junit_xml(result).write(report_xml_file)
    225         return result
    226