Home | History | Annotate | Download | only in threads
      1 # A parallelized "find(1)" using the thread module.
      2 
      3 # This demonstrates the use of a work queue and worker threads.
      4 # It really does do more stats/sec when using multiple threads,
      5 # although the improvement is only about 20-30 percent.
      6 # (That was 8 years ago.  In 2002, on Linux, I can't measure
      7 # a speedup. :-( )
      8 
      9 # I'm too lazy to write a command line parser for the full find(1)
     10 # command line syntax, so the predicate it searches for is wired-in,
     11 # see function selector() below.  (It currently searches for files with
     12 # world write permission.)
     13 
     14 # Usage: parfind.py [-w nworkers] [directory] ...
     15 # Default nworkers is 4
     16 
     17 
     18 import sys
     19 import getopt
     20 import string
     21 import time
     22 import os
     23 from stat import *
     24 import thread
     25 
     26 
     27 # Work queue class.  Usage:
     28 #   wq = WorkQ()
     29 #   wq.addwork(func, (arg1, arg2, ...)) # one or more calls
     30 #   wq.run(nworkers)
     31 # The work is done when wq.run() completes.
     32 # The function calls executed by the workers may add more work.
     33 # Don't use keyboard interrupts!
     34 
     35 class WorkQ:
     36 
     37     # Invariants:
     38 
     39     # - busy and work are only modified when mutex is locked
     40     # - len(work) is the number of jobs ready to be taken
     41     # - busy is the number of jobs being done
     42     # - todo is locked iff there is no work and somebody is busy
     43 
     44     def __init__(self):
     45         self.mutex = thread.allocate()
     46         self.todo = thread.allocate()
     47         self.todo.acquire()
     48         self.work = []
     49         self.busy = 0
     50 
     51     def addwork(self, func, args):
     52         job = (func, args)
     53         self.mutex.acquire()
     54         self.work.append(job)
     55         self.mutex.release()
     56         if len(self.work) == 1:
     57             self.todo.release()
     58 
     59     def _getwork(self):
     60         self.todo.acquire()
     61         self.mutex.acquire()
     62         if self.busy == 0 and len(self.work) == 0:
     63             self.mutex.release()
     64             self.todo.release()
     65             return None
     66         job = self.work[0]
     67         del self.work[0]
     68         self.busy = self.busy + 1
     69         self.mutex.release()
     70         if len(self.work) > 0:
     71             self.todo.release()
     72         return job
     73 
     74     def _donework(self):
     75         self.mutex.acquire()
     76         self.busy = self.busy - 1
     77         if self.busy == 0 and len(self.work) == 0:
     78             self.todo.release()
     79         self.mutex.release()
     80 
     81     def _worker(self):
     82         time.sleep(0.00001)     # Let other threads run
     83         while 1:
     84             job = self._getwork()
     85             if not job:
     86                 break
     87             func, args = job
     88             apply(func, args)
     89             self._donework()
     90 
     91     def run(self, nworkers):
     92         if not self.work:
     93             return # Nothing to do
     94         for i in range(nworkers-1):
     95             thread.start_new(self._worker, ())
     96         self._worker()
     97         self.todo.acquire()
     98 
     99 
    100 # Main program
    101 
    102 def main():
    103     nworkers = 4
    104     opts, args = getopt.getopt(sys.argv[1:], '-w:')
    105     for opt, arg in opts:
    106         if opt == '-w':
    107             nworkers = string.atoi(arg)
    108     if not args:
    109         args = [os.curdir]
    110 
    111     wq = WorkQ()
    112     for dir in args:
    113         wq.addwork(find, (dir, selector, wq))
    114 
    115     t1 = time.time()
    116     wq.run(nworkers)
    117     t2 = time.time()
    118 
    119     sys.stderr.write('Total time %r sec.\n' % (t2-t1))
    120 
    121 
    122 # The predicate -- defines what files we look for.
    123 # Feel free to change this to suit your purpose
    124 
    125 def selector(dir, name, fullname, stat):
    126     # Look for world writable files that are not symlinks
    127     return (stat[ST_MODE] & 0002) != 0 and not S_ISLNK(stat[ST_MODE])
    128 
    129 
    130 # The find procedure -- calls wq.addwork() for subdirectories
    131 
    132 def find(dir, pred, wq):
    133     try:
    134         names = os.listdir(dir)
    135     except os.error, msg:
    136         print repr(dir), ':', msg
    137         return
    138     for name in names:
    139         if name not in (os.curdir, os.pardir):
    140             fullname = os.path.join(dir, name)
    141             try:
    142                 stat = os.lstat(fullname)
    143             except os.error, msg:
    144                 print repr(fullname), ':', msg
    145                 continue
    146             if pred(dir, name, fullname, stat):
    147                 print fullname
    148             if S_ISDIR(stat[ST_MODE]):
    149                 if not os.path.ismount(fullname):
    150                     wq.addwork(find, (fullname, pred, wq))
    151 
    152 
    153 # Call the main program
    154 
    155 main()
    156