Home | History | Annotate | Download | only in metaclasses
      1 """Synchronization metaclass.
      2 
      3 This metaclass  makes it possible to declare synchronized methods.
      4 
      5 """
      6 
      7 import thread
      8 
      9 # First we need to define a reentrant lock.

     10 # This is generally useful and should probably be in a standard Python

     11 # library module.  For now, we in-line it.

     12 
     13 class Lock:
     14 
     15     """Reentrant lock.
     16 
     17     This is a mutex-like object which can be acquired by the same
     18     thread more than once.  It keeps a reference count of the number
     19     of times it has been acquired by the same thread.  Each acquire()
     20     call must be matched by a release() call and only the last
     21     release() call actually releases the lock for acquisition by
     22     another thread.
     23 
     24     The implementation uses two locks internally:
     25 
     26     __mutex is a short term lock used to protect the instance variables
     27     __wait is the lock for which other threads wait
     28 
     29     A thread intending to acquire both locks should acquire __wait
     30     first.
     31 
     32    The implementation uses two other instance variables, protected by
     33    locking __mutex:
     34 
     35     __tid is the thread ID of the thread that currently has the lock
     36     __count is the number of times the current thread has acquired it
     37 
     38     When the lock is released, __tid is None and __count is zero.
     39 
     40     """
     41 
     42     def __init__(self):
     43         """Constructor.  Initialize all instance variables."""
     44         self.__mutex = thread.allocate_lock()
     45         self.__wait = thread.allocate_lock()
     46         self.__tid = None
     47         self.__count = 0
     48 
     49     def acquire(self, flag=1):
     50         """Acquire the lock.
     51 
     52         If the optional flag argument is false, returns immediately
     53         when it cannot acquire the __wait lock without blocking (it
     54         may still block for a little while in order to acquire the
     55         __mutex lock).
     56 
     57         The return value is only relevant when the flag argument is
     58         false; it is 1 if the lock is acquired, 0 if not.
     59 
     60         """
     61         self.__mutex.acquire()
     62         try:
     63             if self.__tid == thread.get_ident():
     64                 self.__count = self.__count + 1
     65                 return 1
     66         finally:
     67             self.__mutex.release()
     68         locked = self.__wait.acquire(flag)
     69         if not flag and not locked:
     70             return 0
     71         try:
     72             self.__mutex.acquire()
     73             assert self.__tid == None
     74             assert self.__count == 0
     75             self.__tid = thread.get_ident()
     76             self.__count = 1
     77             return 1
     78         finally:
     79             self.__mutex.release()
     80 
     81     def release(self):
     82         """Release the lock.
     83 
     84         If this thread doesn't currently have the lock, an assertion
     85         error is raised.
     86 
     87         Only allow another thread to acquire the lock when the count
     88         reaches zero after decrementing it.
     89 
     90         """
     91         self.__mutex.acquire()
     92         try:
     93             assert self.__tid == thread.get_ident()
     94             assert self.__count > 0
     95             self.__count = self.__count - 1
     96             if self.__count == 0:
     97                 self.__tid = None
     98                 self.__wait.release()
     99         finally:
    100             self.__mutex.release()
    101 
    102 
    103 def _testLock():
    104 
    105     done = []
    106 
    107     def f2(lock, done=done):
    108         lock.acquire()
    109         print "f2 running in thread %d\n" % thread.get_ident(),
    110         lock.release()
    111         done.append(1)
    112 
    113     def f1(lock, f2=f2, done=done):
    114         lock.acquire()
    115         print "f1 running in thread %d\n" % thread.get_ident(),
    116         try:
    117             f2(lock)
    118         finally:
    119             lock.release()
    120         done.append(1)
    121 
    122     lock = Lock()
    123     lock.acquire()
    124     f1(lock)                            # Adds 2 to done

    125     lock.release()
    126 
    127     lock.acquire()
    128 
    129     thread.start_new_thread(f1, (lock,)) # Adds 2

    130     thread.start_new_thread(f1, (lock, f1)) # Adds 3

    131     thread.start_new_thread(f2, (lock,)) # Adds 1

    132     thread.start_new_thread(f2, (lock,)) # Adds 1

    133 
    134     lock.release()
    135     import time
    136     while len(done) < 9:
    137         print len(done)
    138         time.sleep(0.001)
    139     print len(done)
    140 
    141 
    142 # Now, the Locking metaclass is a piece of cake.

    143 # As an example feature, methods whose name begins with exactly one

    144 # underscore are not synchronized.

    145 
    146 from Meta import MetaClass, MetaHelper, MetaMethodWrapper
    147 
    148 class LockingMethodWrapper(MetaMethodWrapper):
    149     def __call__(self, *args, **kw):
    150         if self.__name__[:1] == '_' and self.__name__[1:] != '_':
    151             return apply(self.func, (self.inst,) + args, kw)
    152         self.inst.__lock__.acquire()
    153         try:
    154             return apply(self.func, (self.inst,) + args, kw)
    155         finally:
    156             self.inst.__lock__.release()
    157 
    158 class LockingHelper(MetaHelper):
    159     __methodwrapper__ = LockingMethodWrapper
    160     def __helperinit__(self, formalclass):
    161         MetaHelper.__helperinit__(self, formalclass)
    162         self.__lock__ = Lock()
    163 
    164 class LockingMetaClass(MetaClass):
    165     __helper__ = LockingHelper
    166 
    167 Locking = LockingMetaClass('Locking', (), {})
    168 
    169 def _test():
    170     # For kicks, take away the Locking base class and see it die

    171     class Buffer(Locking):
    172         def __init__(self, initialsize):
    173             assert initialsize > 0
    174             self.size = initialsize
    175             self.buffer = [None]*self.size
    176             self.first = self.last = 0
    177         def put(self, item):
    178             # Do we need to grow the buffer?

    179             if (self.last+1) % self.size != self.first:
    180                 # Insert the new item

    181                 self.buffer[self.last] = item
    182                 self.last = (self.last+1) % self.size
    183                 return
    184             # Double the buffer size

    185             # First normalize it so that first==0 and last==size-1

    186             print "buffer =", self.buffer
    187             print "first = %d, last = %d, size = %d" % (
    188                 self.first, self.last, self.size)
    189             if self.first <= self.last:
    190                 temp = self.buffer[self.first:self.last]
    191             else:
    192                 temp = self.buffer[self.first:] + self.buffer[:self.last]
    193             print "temp =", temp
    194             self.buffer = temp + [None]*(self.size+1)
    195             self.first = 0
    196             self.last = self.size-1
    197             self.size = self.size*2
    198             print "Buffer size doubled to", self.size
    199             print "new buffer =", self.buffer
    200             print "first = %d, last = %d, size = %d" % (
    201                 self.first, self.last, self.size)
    202             self.put(item)              # Recursive call to test the locking

    203         def get(self):
    204             # Is the buffer empty?

    205             if self.first == self.last:
    206                 raise EOFError          # Avoid defining a new exception

    207             item = self.buffer[self.first]
    208             self.first = (self.first+1) % self.size
    209             return item
    210 
    211     def producer(buffer, wait, n=1000):
    212         import time
    213         i = 0
    214         while i < n:
    215             print "put", i
    216             buffer.put(i)
    217             i = i+1
    218         print "Producer: done producing", n, "items"
    219         wait.release()
    220 
    221     def consumer(buffer, wait, n=1000):
    222         import time
    223         i = 0
    224         tout = 0.001
    225         while i < n:
    226             try:
    227                 x = buffer.get()
    228                 if x != i:
    229                     raise AssertionError, \
    230                           "get() returned %s, expected %s" % (x, i)
    231                 print "got", i
    232                 i = i+1
    233                 tout = 0.001
    234             except EOFError:
    235                 time.sleep(tout)
    236                 tout = tout*2
    237         print "Consumer: done consuming", n, "items"
    238         wait.release()
    239 
    240     pwait = thread.allocate_lock()
    241     pwait.acquire()
    242     cwait = thread.allocate_lock()
    243     cwait.acquire()
    244     buffer = Buffer(1)
    245     n = 1000
    246     thread.start_new_thread(consumer, (buffer, cwait, n))
    247     thread.start_new_thread(producer, (buffer, pwait, n))
    248     pwait.acquire()
    249     print "Producer done"
    250     cwait.acquire()
    251     print "All done"
    252     print "buffer size ==", len(buffer.buffer)
    253 
    254 if __name__ == '__main__':
    255     _testLock()
    256     _test()
    257