Home | History | Annotate | Download | only in lib
      1 """
      2 TestCmd.py:  a testing framework for commands and scripts.
      3 
      4 The TestCmd module provides a framework for portable automated testing
      5 of executable commands and scripts (in any language, not just Python),
      6 especially commands and scripts that require file system interaction.
      7 
      8 In addition to running tests and evaluating conditions, the TestCmd
      9 module manages and cleans up one or more temporary workspace
     10 directories, and provides methods for creating files and directories in
     11 those workspace directories from in-line data, here-documents), allowing
     12 tests to be completely self-contained.
     13 
     14 A TestCmd environment object is created via the usual invocation:
     15 
     16     import TestCmd
     17     test = TestCmd.TestCmd()
     18 
     19 There are a bunch of keyword arguments available at instantiation:
     20 
     21     test = TestCmd.TestCmd(description = 'string',
     22                            program = 'program_or_script_to_test',
     23                            interpreter = 'script_interpreter',
     24                            workdir = 'prefix',
     25                            subdir = 'subdir',
     26                            verbose = Boolean,
     27                            match = default_match_function,
     28                            diff = default_diff_function,
     29                            combine = Boolean)
     30 
     31 There are a bunch of methods that let you do different things:
     32 
     33     test.verbose_set(1)
     34 
     35     test.description_set('string')
     36 
     37     test.program_set('program_or_script_to_test')
     38 
     39     test.interpreter_set('script_interpreter')
     40     test.interpreter_set(['script_interpreter', 'arg'])
     41 
     42     test.workdir_set('prefix')
     43     test.workdir_set('')
     44 
     45     test.workpath('file')
     46     test.workpath('subdir', 'file')
     47 
     48     test.subdir('subdir', ...)
     49 
     50     test.rmdir('subdir', ...)
     51 
     52     test.write('file', "contents\n")
     53     test.write(['subdir', 'file'], "contents\n")
     54 
     55     test.read('file')
     56     test.read(['subdir', 'file'])
     57     test.read('file', mode)
     58     test.read(['subdir', 'file'], mode)
     59 
     60     test.writable('dir', 1)
     61     test.writable('dir', None)
     62 
     63     test.preserve(condition, ...)
     64 
     65     test.cleanup(condition)
     66 
     67     test.command_args(program = 'program_or_script_to_run',
     68                       interpreter = 'script_interpreter',
     69                       arguments = 'arguments to pass to program')
     70 
     71     test.run(program = 'program_or_script_to_run',
     72              interpreter = 'script_interpreter',
     73              arguments = 'arguments to pass to program',
     74              chdir = 'directory_to_chdir_to',
     75              stdin = 'input to feed to the program\n')
     76              universal_newlines = True)
     77 
     78     p = test.start(program = 'program_or_script_to_run',
     79                    interpreter = 'script_interpreter',
     80                    arguments = 'arguments to pass to program',
     81                    universal_newlines = None)
     82 
     83     test.finish(self, p)
     84 
     85     test.pass_test()
     86     test.pass_test(condition)
     87     test.pass_test(condition, function)
     88 
     89     test.fail_test()
     90     test.fail_test(condition)
     91     test.fail_test(condition, function)
     92     test.fail_test(condition, function, skip)
     93 
     94     test.no_result()
     95     test.no_result(condition)
     96     test.no_result(condition, function)
     97     test.no_result(condition, function, skip)
     98 
     99     test.stdout()
    100     test.stdout(run)
    101 
    102     test.stderr()
    103     test.stderr(run)
    104 
    105     test.symlink(target, link)
    106 
    107     test.banner(string)
    108     test.banner(string, width)
    109 
    110     test.diff(actual, expected)
    111 
    112     test.match(actual, expected)
    113 
    114     test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
    115     test.match_exact(["actual 1\n", "actual 2\n"],
    116                      ["expected 1\n", "expected 2\n"])
    117 
    118     test.match_re("actual 1\nactual 2\n", regex_string)
    119     test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
    120 
    121     test.match_re_dotall("actual 1\nactual 2\n", regex_string)
    122     test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
    123 
    124     test.tempdir()
    125     test.tempdir('temporary-directory')
    126 
    127     test.sleep()
    128     test.sleep(seconds)
    129 
    130     test.where_is('foo')
    131     test.where_is('foo', 'PATH1:PATH2')
    132     test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
    133 
    134     test.unlink('file')
    135     test.unlink('subdir', 'file')
    136 
    137 The TestCmd module provides pass_test(), fail_test(), and no_result()
    138 unbound functions that report test results for use with the Aegis change
    139 management system.  These methods terminate the test immediately,
    140 reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
    141 status 0 (success), 1 or 2 respectively.  This allows for a distinction
    142 between an actual failed test and a test that could not be properly
    143 evaluated because of an external condition (such as a full file system
    144 or incorrect permissions).
    145 
    146     import TestCmd
    147 
    148     TestCmd.pass_test()
    149     TestCmd.pass_test(condition)
    150     TestCmd.pass_test(condition, function)
    151 
    152     TestCmd.fail_test()
    153     TestCmd.fail_test(condition)
    154     TestCmd.fail_test(condition, function)
    155     TestCmd.fail_test(condition, function, skip)
    156 
    157     TestCmd.no_result()
    158     TestCmd.no_result(condition)
    159     TestCmd.no_result(condition, function)
    160     TestCmd.no_result(condition, function, skip)
    161 
    162 The TestCmd module also provides unbound functions that handle matching
    163 in the same way as the match_*() methods described above.
    164 
    165     import TestCmd
    166 
    167     test = TestCmd.TestCmd(match = TestCmd.match_exact)
    168 
    169     test = TestCmd.TestCmd(match = TestCmd.match_re)
    170 
    171     test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
    172 
    173 The TestCmd module provides unbound functions that can be used for the
    174 "diff" argument to TestCmd.TestCmd instantiation:
    175 
    176     import TestCmd
    177 
    178     test = TestCmd.TestCmd(match = TestCmd.match_re,
    179                            diff = TestCmd.diff_re)
    180 
    181     test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
    182 
    183 The "diff" argument can also be used with standard difflib functions:
    184 
    185     import difflib
    186 
    187     test = TestCmd.TestCmd(diff = difflib.context_diff)
    188 
    189     test = TestCmd.TestCmd(diff = difflib.unified_diff)
    190 
    191 Lastly, the where_is() method also exists in an unbound function
    192 version.
    193 
    194     import TestCmd
    195 
    196     TestCmd.where_is('foo')
    197     TestCmd.where_is('foo', 'PATH1:PATH2')
    198     TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
    199 """
    200 
    201 # Copyright 2000-2010 Steven Knight
    202 # This module is free software, and you may redistribute it and/or modify
    203 # it under the same terms as Python itself, so long as this copyright message
    204 # and disclaimer are retained in their original form.
    205 #
    206 # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
    207 # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
    208 # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
    209 # DAMAGE.
    210 #
    211 # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
    212 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
    213 # PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
    214 # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
    215 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
    216 
    217 __author__ = "Steven Knight <knight at baldmt dot com>"
    218 __revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
    219 __version__ = "0.37"
    220 
    221 import errno
    222 import os
    223 import os.path
    224 import re
    225 import shutil
    226 import stat
    227 import string
    228 import sys
    229 import tempfile
    230 import time
    231 import traceback
    232 import types
    233 import UserList
    234 
    235 __all__ = [
    236     'diff_re',
    237     'fail_test',
    238     'no_result',
    239     'pass_test',
    240     'match_exact',
    241     'match_re',
    242     'match_re_dotall',
    243     'python_executable',
    244     'TestCmd'
    245 ]
    246 
    247 try:
    248     import difflib
    249 except ImportError:
    250     __all__.append('simple_diff')
    251 
    252 def is_List(e):
    253     return type(e) is types.ListType \
    254         or isinstance(e, UserList.UserList)
    255 
    256 try:
    257     from UserString import UserString
    258 except ImportError:
    259     class UserString:
    260         pass
    261 
    262 if hasattr(types, 'UnicodeType'):
    263     def is_String(e):
    264         return type(e) is types.StringType \
    265             or type(e) is types.UnicodeType \
    266             or isinstance(e, UserString)
    267 else:
    268     def is_String(e):
    269         return type(e) is types.StringType or isinstance(e, UserString)
    270 
    271 tempfile.template = 'testcmd.'
    272 if os.name in ('posix', 'nt'):
    273     tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
    274 else:
    275     tempfile.template = 'testcmd.'
    276 
    277 re_space = re.compile('\s')
    278 
    279 _Cleanup = []
    280 
    281 _chain_to_exitfunc = None
    282 
    283 def _clean():
    284     global _Cleanup
    285     cleanlist = filter(None, _Cleanup)
    286     del _Cleanup[:]
    287     cleanlist.reverse()
    288     for test in cleanlist:
    289         test.cleanup()
    290     if _chain_to_exitfunc:
    291         _chain_to_exitfunc()
    292 
    293 try:
    294     import atexit
    295 except ImportError:
    296     # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
    297     try:
    298         _chain_to_exitfunc = sys.exitfunc
    299     except AttributeError:
    300         pass
    301     sys.exitfunc = _clean
    302 else:
    303     atexit.register(_clean)
    304 
    305 try:
    306     zip
    307 except NameError:
    308     def zip(*lists):
    309         result = []
    310         for i in xrange(min(map(len, lists))):
    311             result.append(tuple(map(lambda l, i=i: l[i], lists)))
    312         return result
    313 
    314 class Collector:
    315     def __init__(self, top):
    316         self.entries = [top]
    317     def __call__(self, arg, dirname, names):
    318         pathjoin = lambda n, d=dirname: os.path.join(d, n)
    319         self.entries.extend(map(pathjoin, names))
    320 
    321 def _caller(tblist, skip):
    322     string = ""
    323     arr = []
    324     for file, line, name, text in tblist:
    325         if file[-10:] == "TestCmd.py":
    326                 break
    327         arr = [(file, line, name, text)] + arr
    328     atfrom = "at"
    329     for file, line, name, text in arr[skip:]:
    330         if name in ("?", "<module>"):
    331             name = ""
    332         else:
    333             name = " (" + name + ")"
    334         string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
    335         atfrom = "\tfrom"
    336     return string
    337 
    338 def fail_test(self = None, condition = 1, function = None, skip = 0):
    339     """Cause the test to fail.
    340 
    341     By default, the fail_test() method reports that the test FAILED
    342     and exits with a status of 1.  If a condition argument is supplied,
    343     the test fails only if the condition is true.
    344     """
    345     if not condition:
    346         return
    347     if not function is None:
    348         function()
    349     of = ""
    350     desc = ""
    351     sep = " "
    352     if not self is None:
    353         if self.program:
    354             of = " of " + self.program
    355             sep = "\n\t"
    356         if self.description:
    357             desc = " [" + self.description + "]"
    358             sep = "\n\t"
    359 
    360     at = _caller(traceback.extract_stack(), skip)
    361     sys.stderr.write("FAILED test" + of + desc + sep + at)
    362 
    363     sys.exit(1)
    364 
    365 def no_result(self = None, condition = 1, function = None, skip = 0):
    366     """Causes a test to exit with no valid result.
    367 
    368     By default, the no_result() method reports NO RESULT for the test
    369     and exits with a status of 2.  If a condition argument is supplied,
    370     the test fails only if the condition is true.
    371     """
    372     if not condition:
    373         return
    374     if not function is None:
    375         function()
    376     of = ""
    377     desc = ""
    378     sep = " "
    379     if not self is None:
    380         if self.program:
    381             of = " of " + self.program
    382             sep = "\n\t"
    383         if self.description:
    384             desc = " [" + self.description + "]"
    385             sep = "\n\t"
    386 
    387     if os.environ.get('TESTCMD_DEBUG_SKIPS'):
    388         at = _caller(traceback.extract_stack(), skip)
    389         sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
    390     else:
    391         sys.stderr.write("NO RESULT\n")
    392 
    393     sys.exit(2)
    394 
    395 def pass_test(self = None, condition = 1, function = None):
    396     """Causes a test to pass.
    397 
    398     By default, the pass_test() method reports PASSED for the test
    399     and exits with a status of 0.  If a condition argument is supplied,
    400     the test passes only if the condition is true.
    401     """
    402     if not condition:
    403         return
    404     if not function is None:
    405         function()
    406     sys.stderr.write("PASSED\n")
    407     sys.exit(0)
    408 
    409 def match_exact(lines = None, matches = None):
    410     """
    411     """
    412     if not is_List(lines):
    413         lines = string.split(lines, "\n")
    414     if not is_List(matches):
    415         matches = string.split(matches, "\n")
    416     if len(lines) != len(matches):
    417         return
    418     for i in range(len(lines)):
    419         if lines[i] != matches[i]:
    420             return
    421     return 1
    422 
    423 def match_re(lines = None, res = None):
    424     """
    425     """
    426     if not is_List(lines):
    427         lines = string.split(lines, "\n")
    428     if not is_List(res):
    429         res = string.split(res, "\n")
    430     if len(lines) != len(res):
    431         return
    432     for i in range(len(lines)):
    433         s = "^" + res[i] + "$"
    434         try:
    435             expr = re.compile(s)
    436         except re.error, e:
    437             msg = "Regular expression error in %s: %s"
    438             raise re.error, msg % (repr(s), e[0])
    439         if not expr.search(lines[i]):
    440             return
    441     return 1
    442 
    443 def match_re_dotall(lines = None, res = None):
    444     """
    445     """
    446     if not type(lines) is type(""):
    447         lines = string.join(lines, "\n")
    448     if not type(res) is type(""):
    449         res = string.join(res, "\n")
    450     s = "^" + res + "$"
    451     try:
    452         expr = re.compile(s, re.DOTALL)
    453     except re.error, e:
    454         msg = "Regular expression error in %s: %s"
    455         raise re.error, msg % (repr(s), e[0])
    456     if expr.match(lines):
    457         return 1
    458 
    459 try:
    460     import difflib
    461 except ImportError:
    462     pass
    463 else:
    464     def simple_diff(a, b, fromfile='', tofile='',
    465                     fromfiledate='', tofiledate='', n=3, lineterm='\n'):
    466         """
    467         A function with the same calling signature as difflib.context_diff
    468         (diff -c) and difflib.unified_diff (diff -u) but which prints
    469         output like the simple, unadorned 'diff" command.
    470         """
    471         sm = difflib.SequenceMatcher(None, a, b)
    472         def comma(x1, x2):
    473             return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
    474         result = []
    475         for op, a1, a2, b1, b2 in sm.get_opcodes():
    476             if op == 'delete':
    477                 result.append("%sd%d" % (comma(a1, a2), b1))
    478                 result.extend(map(lambda l: '< ' + l, a[a1:a2]))
    479             elif op == 'insert':
    480                 result.append("%da%s" % (a1, comma(b1, b2)))
    481                 result.extend(map(lambda l: '> ' + l, b[b1:b2]))
    482             elif op == 'replace':
    483                 result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
    484                 result.extend(map(lambda l: '< ' + l, a[a1:a2]))
    485                 result.append('---')
    486                 result.extend(map(lambda l: '> ' + l, b[b1:b2]))
    487         return result
    488 
    489 def diff_re(a, b, fromfile='', tofile='',
    490                 fromfiledate='', tofiledate='', n=3, lineterm='\n'):
    491     """
    492     A simple "diff" of two sets of lines when the expected lines
    493     are regular expressions.  This is a really dumb thing that
    494     just compares each line in turn, so it doesn't look for
    495     chunks of matching lines and the like--but at least it lets
    496     you know exactly which line first didn't compare correctl...
    497     """
    498     result = []
    499     diff = len(a) - len(b)
    500     if diff < 0:
    501         a = a + ['']*(-diff)
    502     elif diff > 0:
    503         b = b + ['']*diff
    504     i = 0
    505     for aline, bline in zip(a, b):
    506         s = "^" + aline + "$"
    507         try:
    508             expr = re.compile(s)
    509         except re.error, e:
    510             msg = "Regular expression error in %s: %s"
    511             raise re.error, msg % (repr(s), e[0])
    512         if not expr.search(bline):
    513             result.append("%sc%s" % (i+1, i+1))
    514             result.append('< ' + repr(a[i]))
    515             result.append('---')
    516             result.append('> ' + repr(b[i]))
    517         i = i+1
    518     return result
    519 
    520 if os.name == 'java':
    521 
    522     python_executable = os.path.join(sys.prefix, 'jython')
    523 
    524 else:
    525 
    526     python_executable = sys.executable
    527 
    528 if sys.platform == 'win32':
    529 
    530     default_sleep_seconds = 2
    531 
    532     def where_is(file, path=None, pathext=None):
    533         if path is None:
    534             path = os.environ['PATH']
    535         if is_String(path):
    536             path = string.split(path, os.pathsep)
    537         if pathext is None:
    538             pathext = os.environ['PATHEXT']
    539         if is_String(pathext):
    540             pathext = string.split(pathext, os.pathsep)
    541         for ext in pathext:
    542             if string.lower(ext) == string.lower(file[-len(ext):]):
    543                 pathext = ['']
    544                 break
    545         for dir in path:
    546             f = os.path.join(dir, file)
    547             for ext in pathext:
    548                 fext = f + ext
    549                 if os.path.isfile(fext):
    550                     return fext
    551         return None
    552 
    553 else:
    554 
    555     def where_is(file, path=None, pathext=None):
    556         if path is None:
    557             path = os.environ['PATH']
    558         if is_String(path):
    559             path = string.split(path, os.pathsep)
    560         for dir in path:
    561             f = os.path.join(dir, file)
    562             if os.path.isfile(f):
    563                 try:
    564                     st = os.stat(f)
    565                 except OSError:
    566                     continue
    567                 if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
    568                     return f
    569         return None
    570 
    571     default_sleep_seconds = 1
    572 
    573 
    574 
    575 try:
    576     import subprocess
    577 except ImportError:
    578     # The subprocess module doesn't exist in this version of Python,
    579     # so we're going to cobble up something that looks just enough
    580     # like its API for our purposes below.
    581     import new
    582 
    583     subprocess = new.module('subprocess')
    584 
    585     subprocess.PIPE = 'PIPE'
    586     subprocess.STDOUT = 'STDOUT'
    587     subprocess.mswindows = (sys.platform == 'win32')
    588 
    589     try:
    590         import popen2
    591         popen2.Popen3
    592     except AttributeError:
    593         class Popen3:
    594             universal_newlines = 1
    595             def __init__(self, command, **kw):
    596                 if sys.platform == 'win32' and command[0] == '"':
    597                     command = '"' + command + '"'
    598                 (stdin, stdout, stderr) = os.popen3(' ' + command)
    599                 self.stdin = stdin
    600                 self.stdout = stdout
    601                 self.stderr = stderr
    602             def close_output(self):
    603                 self.stdout.close()
    604                 self.resultcode = self.stderr.close()
    605             def wait(self):
    606                 resultcode = self.resultcode
    607                 if os.WIFEXITED(resultcode):
    608                     return os.WEXITSTATUS(resultcode)
    609                 elif os.WIFSIGNALED(resultcode):
    610                     return os.WTERMSIG(resultcode)
    611                 else:
    612                     return None
    613 
    614     else:
    615         try:
    616             popen2.Popen4
    617         except AttributeError:
    618             # A cribbed Popen4 class, with some retrofitted code from
    619             # the Python 1.5 Popen3 class methods to do certain things
    620             # by hand.
    621             class Popen4(popen2.Popen3):
    622                 childerr = None
    623 
    624                 def __init__(self, cmd, bufsize=-1):
    625                     p2cread, p2cwrite = os.pipe()
    626                     c2pread, c2pwrite = os.pipe()
    627                     self.pid = os.fork()
    628                     if self.pid == 0:
    629                         # Child
    630                         os.dup2(p2cread, 0)
    631                         os.dup2(c2pwrite, 1)
    632                         os.dup2(c2pwrite, 2)
    633                         for i in range(3, popen2.MAXFD):
    634                             try:
    635                                 os.close(i)
    636                             except: pass
    637                         try:
    638                             os.execvp(cmd[0], cmd)
    639                         finally:
    640                             os._exit(1)
    641                         # Shouldn't come here, I guess
    642                         os._exit(1)
    643                     os.close(p2cread)
    644                     self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
    645                     os.close(c2pwrite)
    646                     self.fromchild = os.fdopen(c2pread, 'r', bufsize)
    647                     popen2._active.append(self)
    648 
    649             popen2.Popen4 = Popen4
    650 
    651         class Popen3(popen2.Popen3, popen2.Popen4):
    652             universal_newlines = 1
    653             def __init__(self, command, **kw):
    654                 if kw.get('stderr') == 'STDOUT':
    655                     apply(popen2.Popen4.__init__, (self, command, 1))
    656                 else:
    657                     apply(popen2.Popen3.__init__, (self, command, 1))
    658                 self.stdin = self.tochild
    659                 self.stdout = self.fromchild
    660                 self.stderr = self.childerr
    661             def wait(self, *args, **kw):
    662                 resultcode = apply(popen2.Popen3.wait, (self,)+args, kw)
    663                 if os.WIFEXITED(resultcode):
    664                     return os.WEXITSTATUS(resultcode)
    665                 elif os.WIFSIGNALED(resultcode):
    666                     return os.WTERMSIG(resultcode)
    667                 else:
    668                     return None
    669 
    670     subprocess.Popen = Popen3
    671 
    672 
    673 
    674 # From Josiah Carlson,
    675 # ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
    676 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
    677 
    678 PIPE = subprocess.PIPE
    679 
    680 if subprocess.mswindows:
    681     from win32file import ReadFile, WriteFile
    682     from win32pipe import PeekNamedPipe
    683     import msvcrt
    684 else:
    685     import select
    686     import fcntl
    687 
    688     try:                    fcntl.F_GETFL
    689     except AttributeError:  fcntl.F_GETFL = 3
    690 
    691     try:                    fcntl.F_SETFL
    692     except AttributeError:  fcntl.F_SETFL = 4
    693 
    694 class Popen(subprocess.Popen):
    695     def recv(self, maxsize=None):
    696         return self._recv('stdout', maxsize)
    697 
    698     def recv_err(self, maxsize=None):
    699         return self._recv('stderr', maxsize)
    700 
    701     def send_recv(self, input='', maxsize=None):
    702         return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
    703 
    704     def get_conn_maxsize(self, which, maxsize):
    705         if maxsize is None:
    706             maxsize = 1024
    707         elif maxsize < 1:
    708             maxsize = 1
    709         return getattr(self, which), maxsize
    710 
    711     def _close(self, which):
    712         getattr(self, which).close()
    713         setattr(self, which, None)
    714 
    715     if subprocess.mswindows:
    716         def send(self, input):
    717             if not self.stdin:
    718                 return None
    719 
    720             try:
    721                 x = msvcrt.get_osfhandle(self.stdin.fileno())
    722                 (errCode, written) = WriteFile(x, input)
    723             except ValueError:
    724                 return self._close('stdin')
    725             except (subprocess.pywintypes.error, Exception), why:
    726                 if why[0] in (109, errno.ESHUTDOWN):
    727                     return self._close('stdin')
    728                 raise
    729 
    730             return written
    731 
    732         def _recv(self, which, maxsize):
    733             conn, maxsize = self.get_conn_maxsize(which, maxsize)
    734             if conn is None:
    735                 return None
    736 
    737             try:
    738                 x = msvcrt.get_osfhandle(conn.fileno())
    739                 (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
    740                 if maxsize < nAvail:
    741                     nAvail = maxsize
    742                 if nAvail > 0:
    743                     (errCode, read) = ReadFile(x, nAvail, None)
    744             except ValueError:
    745                 return self._close(which)
    746             except (subprocess.pywintypes.error, Exception), why:
    747                 if why[0] in (109, errno.ESHUTDOWN):
    748                     return self._close(which)
    749                 raise
    750 
    751             #if self.universal_newlines:
    752             #    read = self._translate_newlines(read)
    753             return read
    754 
    755     else:
    756         def send(self, input):
    757             if not self.stdin:
    758                 return None
    759 
    760             if not select.select([], [self.stdin], [], 0)[1]:
    761                 return 0
    762 
    763             try:
    764                 written = os.write(self.stdin.fileno(), input)
    765             except OSError, why:
    766                 if why[0] == errno.EPIPE: #broken pipe
    767                     return self._close('stdin')
    768                 raise
    769 
    770             return written
    771 
    772         def _recv(self, which, maxsize):
    773             conn, maxsize = self.get_conn_maxsize(which, maxsize)
    774             if conn is None:
    775                 return None
    776 
    777             try:
    778                 flags = fcntl.fcntl(conn, fcntl.F_GETFL)
    779             except TypeError:
    780                 flags = None
    781             else:
    782                 if not conn.closed:
    783                     fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
    784 
    785             try:
    786                 if not select.select([conn], [], [], 0)[0]:
    787                     return ''
    788 
    789                 r = conn.read(maxsize)
    790                 if not r:
    791                     return self._close(which)
    792 
    793                 #if self.universal_newlines:
    794                 #    r = self._translate_newlines(r)
    795                 return r
    796             finally:
    797                 if not conn.closed and not flags is None:
    798                     fcntl.fcntl(conn, fcntl.F_SETFL, flags)
    799 
    800 disconnect_message = "Other end disconnected!"
    801 
    802 def recv_some(p, t=.1, e=1, tr=5, stderr=0):
    803     if tr < 1:
    804         tr = 1
    805     x = time.time()+t
    806     y = []
    807     r = ''
    808     pr = p.recv
    809     if stderr:
    810         pr = p.recv_err
    811     while time.time() < x or r:
    812         r = pr()
    813         if r is None:
    814             if e:
    815                 raise Exception(disconnect_message)
    816             else:
    817                 break
    818         elif r:
    819             y.append(r)
    820         else:
    821             time.sleep(max((x-time.time())/tr, 0))
    822     return ''.join(y)
    823 
    824 # TODO(3.0:  rewrite to use memoryview()
    825 def send_all(p, data):
    826     while len(data):
    827         sent = p.send(data)
    828         if sent is None:
    829             raise Exception(disconnect_message)
    830         data = buffer(data, sent)
    831 
    832 
    833 
    834 try:
    835     object
    836 except NameError:
    837     class object:
    838         pass
    839 
    840 
    841 
    842 class TestCmd(object):
    843     """Class TestCmd
    844     """
    845 
    846     def __init__(self, description = None,
    847                        program = None,
    848                        interpreter = None,
    849                        workdir = None,
    850                        subdir = None,
    851                        verbose = None,
    852                        match = None,
    853                        diff = None,
    854                        combine = 0,
    855                        universal_newlines = 1):
    856         self._cwd = os.getcwd()
    857         self.description_set(description)
    858         self.program_set(program)
    859         self.interpreter_set(interpreter)
    860         if verbose is None:
    861             try:
    862                 verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
    863             except ValueError:
    864                 verbose = 0
    865         self.verbose_set(verbose)
    866         self.combine = combine
    867         self.universal_newlines = universal_newlines
    868         if match is not None:
    869             self.match_function = match
    870         else:
    871             self.match_function = match_re
    872         if diff is not None:
    873             self.diff_function = diff
    874         else:
    875             try:
    876                 difflib
    877             except NameError:
    878                 pass
    879             else:
    880                 self.diff_function = simple_diff
    881                 #self.diff_function = difflib.context_diff
    882                 #self.diff_function = difflib.unified_diff
    883         self._dirlist = []
    884         self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
    885         if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '':
    886             self._preserve['pass_test'] = os.environ['PRESERVE']
    887             self._preserve['fail_test'] = os.environ['PRESERVE']
    888             self._preserve['no_result'] = os.environ['PRESERVE']
    889         else:
    890             try:
    891                 self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
    892             except KeyError:
    893                 pass
    894             try:
    895                 self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
    896             except KeyError:
    897                 pass
    898             try:
    899                 self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
    900             except KeyError:
    901                 pass
    902         self._stdout = []
    903         self._stderr = []
    904         self.status = None
    905         self.condition = 'no_result'
    906         self.workdir_set(workdir)
    907         self.subdir(subdir)
    908 
    909     def __del__(self):
    910         self.cleanup()
    911 
    912     def __repr__(self):
    913         return "%x" % id(self)
    914 
    915     banner_char = '='
    916     banner_width = 80
    917 
    918     def banner(self, s, width=None):
    919         if width is None:
    920             width = self.banner_width
    921         return s + self.banner_char * (width - len(s))
    922 
    923     if os.name == 'posix':
    924 
    925         def escape(self, arg):
    926             "escape shell special characters"
    927             slash = '\\'
    928             special = '"$'
    929 
    930             arg = string.replace(arg, slash, slash+slash)
    931             for c in special:
    932                 arg = string.replace(arg, c, slash+c)
    933 
    934             if re_space.search(arg):
    935                 arg = '"' + arg + '"'
    936             return arg
    937 
    938     else:
    939 
    940         # Windows does not allow special characters in file names
    941         # anyway, so no need for an escape function, we will just quote
    942         # the arg.
    943         def escape(self, arg):
    944             if re_space.search(arg):
    945                 arg = '"' + arg + '"'
    946             return arg
    947 
    948     def canonicalize(self, path):
    949         if is_List(path):
    950             path = apply(os.path.join, tuple(path))
    951         if not os.path.isabs(path):
    952             path = os.path.join(self.workdir, path)
    953         return path
    954 
    955     def chmod(self, path, mode):
    956         """Changes permissions on the specified file or directory
    957         path name."""
    958         path = self.canonicalize(path)
    959         os.chmod(path, mode)
    960 
    961     def cleanup(self, condition = None):
    962         """Removes any temporary working directories for the specified
    963         TestCmd environment.  If the environment variable PRESERVE was
    964         set when the TestCmd environment was created, temporary working
    965         directories are not removed.  If any of the environment variables
    966         PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
    967         when the TestCmd environment was created, then temporary working
    968         directories are not removed if the test passed, failed, or had
    969         no result, respectively.  Temporary working directories are also
    970         preserved for conditions specified via the preserve method.
    971 
    972         Typically, this method is not called directly, but is used when
    973         the script exits to clean up temporary working directories as
    974         appropriate for the exit status.
    975         """
    976         if not self._dirlist:
    977             return
    978         os.chdir(self._cwd)
    979         self.workdir = None
    980         if condition is None:
    981             condition = self.condition
    982         if self._preserve[condition]:
    983             for dir in self._dirlist:
    984                 print "Preserved directory", dir
    985         else:
    986             list = self._dirlist[:]
    987             list.reverse()
    988             for dir in list:
    989                 self.writable(dir, 1)
    990                 shutil.rmtree(dir, ignore_errors = 1)
    991             self._dirlist = []
    992 
    993         try:
    994             global _Cleanup
    995             _Cleanup.remove(self)
    996         except (AttributeError, ValueError):
    997             pass
    998 
    999     def command_args(self, program = None,
   1000                            interpreter = None,
   1001                            arguments = None):
   1002         if program:
   1003             if type(program) == type('') and not os.path.isabs(program):
   1004                 program = os.path.join(self._cwd, program)
   1005         else:
   1006             program = self.program
   1007             if not interpreter:
   1008                 interpreter = self.interpreter
   1009         if not type(program) in [type([]), type(())]:
   1010             program = [program]
   1011         cmd = list(program)
   1012         if interpreter:
   1013             if not type(interpreter) in [type([]), type(())]:
   1014                 interpreter = [interpreter]
   1015             cmd = list(interpreter) + cmd
   1016         if arguments:
   1017             if type(arguments) == type(''):
   1018                 arguments = string.split(arguments)
   1019             cmd.extend(arguments)
   1020         return cmd
   1021 
   1022     def description_set(self, description):
   1023         """Set the description of the functionality being tested.
   1024         """
   1025         self.description = description
   1026 
   1027     try:
   1028         difflib
   1029     except NameError:
   1030         def diff(self, a, b, name, *args, **kw):
   1031             print self.banner('Expected %s' % name)
   1032             print a
   1033             print self.banner('Actual %s' % name)
   1034             print b
   1035     else:
   1036         def diff(self, a, b, name, *args, **kw):
   1037             print self.banner(name)
   1038             args = (a.splitlines(), b.splitlines()) + args
   1039             lines = apply(self.diff_function, args, kw)
   1040             for l in lines:
   1041                 print l
   1042 
   1043     def fail_test(self, condition = 1, function = None, skip = 0):
   1044         """Cause the test to fail.
   1045         """
   1046         if not condition:
   1047             return
   1048         self.condition = 'fail_test'
   1049         fail_test(self = self,
   1050                   condition = condition,
   1051                   function = function,
   1052                   skip = skip)
   1053 
   1054     def interpreter_set(self, interpreter):
   1055         """Set the program to be used to interpret the program
   1056         under test as a script.
   1057         """
   1058         self.interpreter = interpreter
   1059 
   1060     def match(self, lines, matches):
   1061         """Compare actual and expected file contents.
   1062         """
   1063         return self.match_function(lines, matches)
   1064 
   1065     def match_exact(self, lines, matches):
   1066         """Compare actual and expected file contents.
   1067         """
   1068         return match_exact(lines, matches)
   1069 
   1070     def match_re(self, lines, res):
   1071         """Compare actual and expected file contents.
   1072         """
   1073         return match_re(lines, res)
   1074 
   1075     def match_re_dotall(self, lines, res):
   1076         """Compare actual and expected file contents.
   1077         """
   1078         return match_re_dotall(lines, res)
   1079 
   1080     def no_result(self, condition = 1, function = None, skip = 0):
   1081         """Report that the test could not be run.
   1082         """
   1083         if not condition:
   1084             return
   1085         self.condition = 'no_result'
   1086         no_result(self = self,
   1087                   condition = condition,
   1088                   function = function,
   1089                   skip = skip)
   1090 
   1091     def pass_test(self, condition = 1, function = None):
   1092         """Cause the test to pass.
   1093         """
   1094         if not condition:
   1095             return
   1096         self.condition = 'pass_test'
   1097         pass_test(self = self, condition = condition, function = function)
   1098 
   1099     def preserve(self, *conditions):
   1100         """Arrange for the temporary working directories for the
   1101         specified TestCmd environment to be preserved for one or more
   1102         conditions.  If no conditions are specified, arranges for
   1103         the temporary working directories to be preserved for all
   1104         conditions.
   1105         """
   1106         if conditions is ():
   1107             conditions = ('pass_test', 'fail_test', 'no_result')
   1108         for cond in conditions:
   1109             self._preserve[cond] = 1
   1110 
   1111     def program_set(self, program):
   1112         """Set the executable program or script to be tested.
   1113         """
   1114         if program and not os.path.isabs(program):
   1115             program = os.path.join(self._cwd, program)
   1116         self.program = program
   1117 
   1118     def read(self, file, mode = 'rb'):
   1119         """Reads and returns the contents of the specified file name.
   1120         The file name may be a list, in which case the elements are
   1121         concatenated with the os.path.join() method.  The file is
   1122         assumed to be under the temporary working directory unless it
   1123         is an absolute path name.  The I/O mode for the file may
   1124         be specified; it must begin with an 'r'.  The default is
   1125         'rb' (binary read).
   1126         """
   1127         file = self.canonicalize(file)
   1128         if mode[0] != 'r':
   1129             raise ValueError, "mode must begin with 'r'"
   1130         with open(file, mode) as f:
   1131             result = f.read()
   1132         return result
   1133 
   1134     def rmdir(self, dir):
   1135         """Removes the specified dir name.
   1136         The dir name may be a list, in which case the elements are
   1137         concatenated with the os.path.join() method.  The dir is
   1138         assumed to be under the temporary working directory unless it
   1139         is an absolute path name.
   1140         The dir must be empty.
   1141         """
   1142         dir = self.canonicalize(dir)
   1143         os.rmdir(dir)
   1144 
   1145     def start(self, program = None,
   1146                     interpreter = None,
   1147                     arguments = None,
   1148                     universal_newlines = None,
   1149                     **kw):
   1150         """
   1151         Starts a program or script for the test environment.
   1152 
   1153         The specified program will have the original directory
   1154         prepended unless it is enclosed in a [list].
   1155         """
   1156         cmd = self.command_args(program, interpreter, arguments)
   1157         cmd_string = string.join(map(self.escape, cmd), ' ')
   1158         if self.verbose:
   1159             sys.stderr.write(cmd_string + "\n")
   1160         if universal_newlines is None:
   1161             universal_newlines = self.universal_newlines
   1162 
   1163         # On Windows, if we make stdin a pipe when we plan to send 
   1164         # no input, and the test program exits before
   1165         # Popen calls msvcrt.open_osfhandle, that call will fail.
   1166         # So don't use a pipe for stdin if we don't need one.
   1167         stdin = kw.get('stdin', None)
   1168         if stdin is not None:
   1169             stdin = subprocess.PIPE
   1170 
   1171         combine = kw.get('combine', self.combine)
   1172         if combine:
   1173             stderr_value = subprocess.STDOUT
   1174         else:
   1175             stderr_value = subprocess.PIPE
   1176 
   1177         return Popen(cmd,
   1178                      stdin=stdin,
   1179                      stdout=subprocess.PIPE,
   1180                      stderr=stderr_value,
   1181                      universal_newlines=universal_newlines)
   1182 
   1183     def finish(self, popen, **kw):
   1184         """
   1185         Finishes and waits for the process being run under control of
   1186         the specified popen argument, recording the exit status,
   1187         standard output and error output.
   1188         """
   1189         popen.stdin.close()
   1190         self.status = popen.wait()
   1191         if not self.status:
   1192             self.status = 0
   1193         self._stdout.append(popen.stdout.read())
   1194         if popen.stderr:
   1195             stderr = popen.stderr.read()
   1196         else:
   1197             stderr = ''
   1198         self._stderr.append(stderr)
   1199 
   1200     def run(self, program = None,
   1201                   interpreter = None,
   1202                   arguments = None,
   1203                   chdir = None,
   1204                   stdin = None,
   1205                   universal_newlines = None):
   1206         """Runs a test of the program or script for the test
   1207         environment.  Standard output and error output are saved for
   1208         future retrieval via the stdout() and stderr() methods.
   1209 
   1210         The specified program will have the original directory
   1211         prepended unless it is enclosed in a [list].
   1212         """
   1213         if chdir:
   1214             oldcwd = os.getcwd()
   1215             if not os.path.isabs(chdir):
   1216                 chdir = os.path.join(self.workpath(chdir))
   1217             if self.verbose:
   1218                 sys.stderr.write("chdir(" + chdir + ")\n")
   1219             os.chdir(chdir)
   1220         p = self.start(program,
   1221                        interpreter,
   1222                        arguments,
   1223                        universal_newlines,
   1224                        stdin=stdin)
   1225         if stdin:
   1226             if is_List(stdin):
   1227                 for line in stdin:
   1228                     p.stdin.write(line)
   1229             else:
   1230                 p.stdin.write(stdin)
   1231             p.stdin.close()
   1232 
   1233         out = p.stdout.read()
   1234         if p.stderr is None:
   1235             err = ''
   1236         else:
   1237             err = p.stderr.read()
   1238         try:
   1239             close_output = p.close_output
   1240         except AttributeError:
   1241             p.stdout.close()
   1242             if not p.stderr is None:
   1243                 p.stderr.close()
   1244         else:
   1245             close_output()
   1246 
   1247         self._stdout.append(out)
   1248         self._stderr.append(err)
   1249 
   1250         self.status = p.wait()
   1251         if not self.status:
   1252             self.status = 0
   1253 
   1254         if chdir:
   1255             os.chdir(oldcwd)
   1256         if self.verbose >= 2:
   1257             write = sys.stdout.write
   1258             write('============ STATUS: %d\n' % self.status)
   1259             out = self.stdout()
   1260             if out or self.verbose >= 3:
   1261                 write('============ BEGIN STDOUT (len=%d):\n' % len(out))
   1262                 write(out)
   1263                 write('============ END STDOUT\n')
   1264             err = self.stderr()
   1265             if err or self.verbose >= 3:
   1266                 write('============ BEGIN STDERR (len=%d)\n' % len(err))
   1267                 write(err)
   1268                 write('============ END STDERR\n')
   1269 
   1270     def sleep(self, seconds = default_sleep_seconds):
   1271         """Sleeps at least the specified number of seconds.  If no
   1272         number is specified, sleeps at least the minimum number of
   1273         seconds necessary to advance file time stamps on the current
   1274         system.  Sleeping more seconds is all right.
   1275         """
   1276         time.sleep(seconds)
   1277 
   1278     def stderr(self, run = None):
   1279         """Returns the error output from the specified run number.
   1280         If there is no specified run number, then returns the error
   1281         output of the last run.  If the run number is less than zero,
   1282         then returns the error output from that many runs back from the
   1283         current run.
   1284         """
   1285         if not run:
   1286             run = len(self._stderr)
   1287         elif run < 0:
   1288             run = len(self._stderr) + run
   1289         run = run - 1
   1290         return self._stderr[run]
   1291 
   1292     def stdout(self, run = None):
   1293         """Returns the standard output from the specified run number.
   1294         If there is no specified run number, then returns the standard
   1295         output of the last run.  If the run number is less than zero,
   1296         then returns the standard output from that many runs back from
   1297         the current run.
   1298         """
   1299         if not run:
   1300             run = len(self._stdout)
   1301         elif run < 0:
   1302             run = len(self._stdout) + run
   1303         run = run - 1
   1304         return self._stdout[run]
   1305 
   1306     def subdir(self, *subdirs):
   1307         """Create new subdirectories under the temporary working
   1308         directory, one for each argument.  An argument may be a list,
   1309         in which case the list elements are concatenated using the
   1310         os.path.join() method.  Subdirectories multiple levels deep
   1311         must be created using a separate argument for each level:
   1312 
   1313                 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
   1314 
   1315         Returns the number of subdirectories actually created.
   1316         """
   1317         count = 0
   1318         for sub in subdirs:
   1319             if sub is None:
   1320                 continue
   1321             if is_List(sub):
   1322                 sub = apply(os.path.join, tuple(sub))
   1323             new = os.path.join(self.workdir, sub)
   1324             try:
   1325                 os.mkdir(new)
   1326             except OSError:
   1327                 pass
   1328             else:
   1329                 count = count + 1
   1330         return count
   1331 
   1332     def symlink(self, target, link):
   1333         """Creates a symlink to the specified target.
   1334         The link name may be a list, in which case the elements are
   1335         concatenated with the os.path.join() method.  The link is
   1336         assumed to be under the temporary working directory unless it
   1337         is an absolute path name. The target is *not* assumed to be
   1338         under the temporary working directory.
   1339         """
   1340         link = self.canonicalize(link)
   1341         os.symlink(target, link)
   1342 
   1343     def tempdir(self, path=None):
   1344         """Creates a temporary directory.
   1345         A unique directory name is generated if no path name is specified.
   1346         The directory is created, and will be removed when the TestCmd
   1347         object is destroyed.
   1348         """
   1349         if path is None:
   1350             try:
   1351                 path = tempfile.mktemp(prefix=tempfile.template)
   1352             except TypeError:
   1353                 path = tempfile.mktemp()
   1354         os.mkdir(path)
   1355 
   1356         # Symlinks in the path will report things
   1357         # differently from os.getcwd(), so chdir there
   1358         # and back to fetch the canonical path.
   1359         cwd = os.getcwd()
   1360         try:
   1361             os.chdir(path)
   1362             path = os.getcwd()
   1363         finally:
   1364             os.chdir(cwd)
   1365 
   1366         # Uppercase the drive letter since the case of drive
   1367         # letters is pretty much random on win32:
   1368         drive,rest = os.path.splitdrive(path)
   1369         if drive:
   1370             path = string.upper(drive) + rest
   1371 
   1372         #
   1373         self._dirlist.append(path)
   1374         global _Cleanup
   1375         try:
   1376             _Cleanup.index(self)
   1377         except ValueError:
   1378             _Cleanup.append(self)
   1379 
   1380         return path
   1381 
   1382     def touch(self, path, mtime=None):
   1383         """Updates the modification time on the specified file or
   1384         directory path name.  The default is to update to the
   1385         current time if no explicit modification time is specified.
   1386         """
   1387         path = self.canonicalize(path)
   1388         atime = os.path.getatime(path)
   1389         if mtime is None:
   1390             mtime = time.time()
   1391         os.utime(path, (atime, mtime))
   1392 
   1393     def unlink(self, file):
   1394         """Unlinks the specified file name.
   1395         The file name may be a list, in which case the elements are
   1396         concatenated with the os.path.join() method.  The file is
   1397         assumed to be under the temporary working directory unless it
   1398         is an absolute path name.
   1399         """
   1400         file = self.canonicalize(file)
   1401         os.unlink(file)
   1402 
   1403     def verbose_set(self, verbose):
   1404         """Set the verbose level.
   1405         """
   1406         self.verbose = verbose
   1407 
   1408     def where_is(self, file, path=None, pathext=None):
   1409         """Find an executable file.
   1410         """
   1411         if is_List(file):
   1412             file = apply(os.path.join, tuple(file))
   1413         if not os.path.isabs(file):
   1414             file = where_is(file, path, pathext)
   1415         return file
   1416 
   1417     def workdir_set(self, path):
   1418         """Creates a temporary working directory with the specified
   1419         path name.  If the path is a null string (''), a unique
   1420         directory name is created.
   1421         """
   1422         if (path != None):
   1423             if path == '':
   1424                 path = None
   1425             path = self.tempdir(path)
   1426         self.workdir = path
   1427 
   1428     def workpath(self, *args):
   1429         """Returns the absolute path name to a subdirectory or file
   1430         within the current temporary working directory.  Concatenates
   1431         the temporary working directory name with the specified
   1432         arguments using the os.path.join() method.
   1433         """
   1434         return apply(os.path.join, (self.workdir,) + tuple(args))
   1435 
   1436     def readable(self, top, read=1):
   1437         """Make the specified directory tree readable (read == 1)
   1438         or not (read == None).
   1439 
   1440         This method has no effect on Windows systems, which use a
   1441         completely different mechanism to control file readability.
   1442         """
   1443 
   1444         if sys.platform == 'win32':
   1445             return
   1446 
   1447         if read:
   1448             def do_chmod(fname):
   1449                 try: st = os.stat(fname)
   1450                 except OSError: pass
   1451                 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
   1452         else:
   1453             def do_chmod(fname):
   1454                 try: st = os.stat(fname)
   1455                 except OSError: pass
   1456                 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
   1457 
   1458         if os.path.isfile(top):
   1459             # If it's a file, that's easy, just chmod it.
   1460             do_chmod(top)
   1461         elif read:
   1462             # It's a directory and we're trying to turn on read
   1463             # permission, so it's also pretty easy, just chmod the
   1464             # directory and then chmod every entry on our walk down the
   1465             # tree.  Because os.path.walk() is top-down, we'll enable
   1466             # read permission on any directories that have it disabled
   1467             # before os.path.walk() tries to list their contents.
   1468             do_chmod(top)
   1469 
   1470             def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
   1471                 for n in names:
   1472                     do_chmod(os.path.join(dirname, n))
   1473 
   1474             os.path.walk(top, chmod_entries, None)
   1475         else:
   1476             # It's a directory and we're trying to turn off read
   1477             # permission, which means we have to chmod the directoreis
   1478             # in the tree bottom-up, lest disabling read permission from
   1479             # the top down get in the way of being able to get at lower
   1480             # parts of the tree.  But os.path.walk() visits things top
   1481             # down, so we just use an object to collect a list of all
   1482             # of the entries in the tree, reverse the list, and then
   1483             # chmod the reversed (bottom-up) list.
   1484             col = Collector(top)
   1485             os.path.walk(top, col, None)
   1486             col.entries.reverse()
   1487             for d in col.entries: do_chmod(d)
   1488 
   1489     def writable(self, top, write=1):
   1490         """Make the specified directory tree writable (write == 1)
   1491         or not (write == None).
   1492         """
   1493 
   1494         if sys.platform == 'win32':
   1495 
   1496             if write:
   1497                 def do_chmod(fname):
   1498                     try: os.chmod(fname, stat.S_IWRITE)
   1499                     except OSError: pass
   1500             else:
   1501                 def do_chmod(fname):
   1502                     try: os.chmod(fname, stat.S_IREAD)
   1503                     except OSError: pass
   1504 
   1505         else:
   1506 
   1507             if write:
   1508                 def do_chmod(fname):
   1509                     try: st = os.stat(fname)
   1510                     except OSError: pass
   1511                     else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
   1512             else:
   1513                 def do_chmod(fname):
   1514                     try: st = os.stat(fname)
   1515                     except OSError: pass
   1516                     else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
   1517 
   1518         if os.path.isfile(top):
   1519             do_chmod(top)
   1520         else:
   1521             col = Collector(top)
   1522             os.path.walk(top, col, None)
   1523             for d in col.entries: do_chmod(d)
   1524 
   1525     def executable(self, top, execute=1):
   1526         """Make the specified directory tree executable (execute == 1)
   1527         or not (execute == None).
   1528 
   1529         This method has no effect on Windows systems, which use a
   1530         completely different mechanism to control file executability.
   1531         """
   1532 
   1533         if sys.platform == 'win32':
   1534             return
   1535 
   1536         if execute:
   1537             def do_chmod(fname):
   1538                 try: st = os.stat(fname)
   1539                 except OSError: pass
   1540                 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
   1541         else:
   1542             def do_chmod(fname):
   1543                 try: st = os.stat(fname)
   1544                 except OSError: pass
   1545                 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
   1546 
   1547         if os.path.isfile(top):
   1548             # If it's a file, that's easy, just chmod it.
   1549             do_chmod(top)
   1550         elif execute:
   1551             # It's a directory and we're trying to turn on execute
   1552             # permission, so it's also pretty easy, just chmod the
   1553             # directory and then chmod every entry on our walk down the
   1554             # tree.  Because os.path.walk() is top-down, we'll enable
   1555             # execute permission on any directories that have it disabled
   1556             # before os.path.walk() tries to list their contents.
   1557             do_chmod(top)
   1558 
   1559             def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
   1560                 for n in names:
   1561                     do_chmod(os.path.join(dirname, n))
   1562 
   1563             os.path.walk(top, chmod_entries, None)
   1564         else:
   1565             # It's a directory and we're trying to turn off execute
   1566             # permission, which means we have to chmod the directories
   1567             # in the tree bottom-up, lest disabling execute permission from
   1568             # the top down get in the way of being able to get at lower
   1569             # parts of the tree.  But os.path.walk() visits things top
   1570             # down, so we just use an object to collect a list of all
   1571             # of the entries in the tree, reverse the list, and then
   1572             # chmod the reversed (bottom-up) list.
   1573             col = Collector(top)
   1574             os.path.walk(top, col, None)
   1575             col.entries.reverse()
   1576             for d in col.entries: do_chmod(d)
   1577 
   1578     def write(self, file, content, mode = 'wb'):
   1579         """Writes the specified content text (second argument) to the
   1580         specified file name (first argument).  The file name may be
   1581         a list, in which case the elements are concatenated with the
   1582         os.path.join() method.  The file is created under the temporary
   1583         working directory.  Any subdirectories in the path must already
   1584         exist.  The I/O mode for the file may be specified; it must
   1585         begin with a 'w'.  The default is 'wb' (binary write).
   1586         """
   1587         file = self.canonicalize(file)
   1588         if mode[0] != 'w':
   1589             raise ValueError, "mode must begin with 'w'"
   1590         with open(file, mode) as f:
   1591             f.write(content)
   1592 
   1593 # Local Variables:
   1594 # tab-width:4
   1595 # indent-tabs-mode:nil
   1596 # End:
   1597 # vim: set expandtab tabstop=4 shiftwidth=4:
   1598