Home | History | Annotate | Download | only in test
      1 """
      2 TestCases for testing the locking sub-system.
      3 """
      4 
      5 import time
      6 
      7 import unittest
      8 from test_all import db, test_support, verbose, have_threads, \
      9         get_new_environment_path, get_new_database_path
     10 
     11 if have_threads :
     12     from threading import Thread
     13     import sys
     14     if sys.version_info[0] < 3 :
     15         from threading import currentThread
     16     else :
     17         from threading import current_thread as currentThread
     18 
     19 #----------------------------------------------------------------------
     20 
     21 class LockingTestCase(unittest.TestCase):
     22     def setUp(self):
     23         self.homeDir = get_new_environment_path()
     24         self.env = db.DBEnv()
     25         self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
     26                                     db.DB_INIT_LOCK | db.DB_CREATE)
     27 
     28 
     29     def tearDown(self):
     30         self.env.close()
     31         test_support.rmtree(self.homeDir)
     32 
     33 
     34     def test01_simple(self):
     35         if verbose:
     36             print '\n', '-=' * 30
     37             print "Running %s.test01_simple..." % self.__class__.__name__
     38 
     39         anID = self.env.lock_id()
     40         if verbose:
     41             print "locker ID: %s" % anID
     42         lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
     43         if verbose:
     44             print "Acquired lock: %s" % lock
     45         self.env.lock_put(lock)
     46         if verbose:
     47             print "Released lock: %s" % lock
     48         self.env.lock_id_free(anID)
     49 
     50 
     51     def test02_threaded(self):
     52         if verbose:
     53             print '\n', '-=' * 30
     54             print "Running %s.test02_threaded..." % self.__class__.__name__
     55 
     56         threads = []
     57         threads.append(Thread(target = self.theThread,
     58                               args=(db.DB_LOCK_WRITE,)))
     59         threads.append(Thread(target = self.theThread,
     60                               args=(db.DB_LOCK_READ,)))
     61         threads.append(Thread(target = self.theThread,
     62                               args=(db.DB_LOCK_READ,)))
     63         threads.append(Thread(target = self.theThread,
     64                               args=(db.DB_LOCK_WRITE,)))
     65         threads.append(Thread(target = self.theThread,
     66                               args=(db.DB_LOCK_READ,)))
     67         threads.append(Thread(target = self.theThread,
     68                               args=(db.DB_LOCK_READ,)))
     69         threads.append(Thread(target = self.theThread,
     70                               args=(db.DB_LOCK_WRITE,)))
     71         threads.append(Thread(target = self.theThread,
     72                               args=(db.DB_LOCK_WRITE,)))
     73         threads.append(Thread(target = self.theThread,
     74                               args=(db.DB_LOCK_WRITE,)))
     75 
     76         for t in threads:
     77             import sys
     78             if sys.version_info[0] < 3 :
     79                 t.setDaemon(True)
     80             else :
     81                 t.daemon = True
     82             t.start()
     83         for t in threads:
     84             t.join()
     85 
     86         def test03_lock_timeout(self):
     87             self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
     88             self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 0)
     89             self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
     90             self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 0)
     91             self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
     92             self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 123456)
     93             self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
     94             self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 7890123)
     95 
     96     def test04_lock_timeout2(self):
     97         self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
     98         self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
     99         self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
    100         self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
    101 
    102         def deadlock_detection() :
    103             while not deadlock_detection.end :
    104                 deadlock_detection.count = \
    105                     self.env.lock_detect(db.DB_LOCK_EXPIRE)
    106                 if deadlock_detection.count :
    107                     while not deadlock_detection.end :
    108                         pass
    109                     break
    110                 time.sleep(0.01)
    111 
    112         deadlock_detection.end=False
    113         deadlock_detection.count=0
    114         t=Thread(target=deadlock_detection)
    115         import sys
    116         if sys.version_info[0] < 3 :
    117             t.setDaemon(True)
    118         else :
    119             t.daemon = True
    120         t.start()
    121         self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)
    122         anID = self.env.lock_id()
    123         anID2 = self.env.lock_id()
    124         self.assertNotEqual(anID, anID2)
    125         lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)
    126         start_time=time.time()
    127         self.assertRaises(db.DBLockNotGrantedError,
    128                 self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)
    129         end_time=time.time()
    130         deadlock_detection.end=True
    131         # Floating point rounding
    132         self.assertTrue((end_time-start_time) >= 0.0999)
    133         self.env.lock_put(lock)
    134         t.join()
    135 
    136         self.env.lock_id_free(anID)
    137         self.env.lock_id_free(anID2)
    138 
    139         if db.version() >= (4,6):
    140             self.assertTrue(deadlock_detection.count>0)
    141 
    142     def theThread(self, lockType):
    143         import sys
    144         if sys.version_info[0] < 3 :
    145             name = currentThread().getName()
    146         else :
    147             name = currentThread().name
    148 
    149         if lockType ==  db.DB_LOCK_WRITE:
    150             lt = "write"
    151         else:
    152             lt = "read"
    153 
    154         anID = self.env.lock_id()
    155         if verbose:
    156             print "%s: locker ID: %s" % (name, anID)
    157 
    158         for i in xrange(1000) :
    159             lock = self.env.lock_get(anID, "some locked thing", lockType)
    160             if verbose:
    161                 print "%s: Acquired %s lock: %s" % (name, lt, lock)
    162 
    163             self.env.lock_put(lock)
    164             if verbose:
    165                 print "%s: Released %s lock: %s" % (name, lt, lock)
    166 
    167         self.env.lock_id_free(anID)
    168 
    169 
    170 #----------------------------------------------------------------------
    171 
    172 def test_suite():
    173     suite = unittest.TestSuite()
    174 
    175     if have_threads:
    176         suite.addTest(unittest.makeSuite(LockingTestCase))
    177     else:
    178         suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
    179 
    180     return suite
    181 
    182 
    183 if __name__ == '__main__':
    184     unittest.main(defaultTest='test_suite')
    185