Home | History | Annotate | Download | only in test
      1 """TestCases for distributed transactions.
      2 """
      3 
      4 import os
      5 import time
      6 import unittest
      7 
      8 from test_all import db, test_support, have_threads, verbose, \
      9         get_new_environment_path, get_new_database_path
     10 
     11 
     12 #----------------------------------------------------------------------
     13 
     14 class DBReplication(unittest.TestCase) :
     15     def setUp(self) :
     16         self.homeDirMaster = get_new_environment_path()
     17         self.homeDirClient = get_new_environment_path()
     18 
     19         self.dbenvMaster = db.DBEnv()
     20         self.dbenvClient = db.DBEnv()
     21 
     22         # Must use "DB_THREAD" because the Replication Manager will
     23         # be executed in other threads but will use the same environment.
     24         # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0
     25         self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN
     26                 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
     27                 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
     28         self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN
     29                 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
     30                 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666)
     31 
     32         self.confirmed_master=self.client_startupdone=False
     33         def confirmed_master(a,b,c) :
     34             if b==db.DB_EVENT_REP_MASTER :
     35                 self.confirmed_master=True
     36 
     37         def client_startupdone(a,b,c) :
     38             if b==db.DB_EVENT_REP_STARTUPDONE :
     39                 self.client_startupdone=True
     40 
     41         self.dbenvMaster.set_event_notify(confirmed_master)
     42         self.dbenvClient.set_event_notify(client_startupdone)
     43 
     44         #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
     45         #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
     46         #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
     47         #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
     48 
     49         self.dbMaster = self.dbClient = None
     50 
     51 
     52     def tearDown(self):
     53         if self.dbClient :
     54             self.dbClient.close()
     55         if self.dbMaster :
     56             self.dbMaster.close()
     57 
     58         # Here we assign dummy event handlers to allow GC of the test object.
     59         # Since the dummy handler doesn't use any outer scope variable, it
     60         # doesn't keep any reference to the test object.
     61         def dummy(*args) :
     62             pass
     63         self.dbenvMaster.set_event_notify(dummy)
     64         self.dbenvClient.set_event_notify(dummy)
     65 
     66         self.dbenvClient.close()
     67         self.dbenvMaster.close()
     68         test_support.rmtree(self.homeDirClient)
     69         test_support.rmtree(self.homeDirMaster)
     70 
     71 class DBReplicationManager(DBReplication) :
     72     def test01_basic_replication(self) :
     73         master_port = test_support.find_unused_port()
     74         client_port = test_support.find_unused_port()
     75         if db.version() >= (5, 2) :
     76             self.site = self.dbenvMaster.repmgr_site("127.0.0.1", master_port)
     77             self.site.set_config(db.DB_GROUP_CREATOR, True)
     78             self.site.set_config(db.DB_LOCAL_SITE, True)
     79             self.site2 = self.dbenvMaster.repmgr_site("127.0.0.1", client_port)
     80 
     81             self.site3 = self.dbenvClient.repmgr_site("127.0.0.1", master_port)
     82             self.site3.set_config(db.DB_BOOTSTRAP_HELPER, True)
     83             self.site4 = self.dbenvClient.repmgr_site("127.0.0.1", client_port)
     84             self.site4.set_config(db.DB_LOCAL_SITE, True)
     85 
     86             d = {
     87                     db.DB_BOOTSTRAP_HELPER: [False, False, True, False],
     88                     db.DB_GROUP_CREATOR: [True, False, False, False],
     89                     db.DB_LEGACY: [False, False, False, False],
     90                     db.DB_LOCAL_SITE: [True, False, False, True],
     91                     db.DB_REPMGR_PEER: [False, False, False, False ],
     92                 }
     93 
     94             for i, j in d.items() :
     95                 for k, v in \
     96                         zip([self.site, self.site2, self.site3, self.site4], j) :
     97                     if v :
     98                         self.assertTrue(k.get_config(i))
     99                     else :
    100                         self.assertFalse(k.get_config(i))
    101 
    102             self.assertNotEqual(self.site.get_eid(), self.site2.get_eid())
    103             self.assertNotEqual(self.site3.get_eid(), self.site4.get_eid())
    104 
    105             for i, j in zip([self.site, self.site2, self.site3, self.site4], \
    106                     [master_port, client_port, master_port, client_port]) :
    107                 addr = i.get_address()
    108                 self.assertEqual(addr, ("127.0.0.1", j))
    109 
    110             for i in [self.site, self.site2] :
    111                 self.assertEqual(i.get_address(),
    112                         self.dbenvMaster.repmgr_site_by_eid(i.get_eid()).get_address())
    113             for i in [self.site3, self.site4] :
    114                 self.assertEqual(i.get_address(),
    115                         self.dbenvClient.repmgr_site_by_eid(i.get_eid()).get_address())
    116         else :
    117             self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port)
    118             self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port)
    119             self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port)
    120             self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port)
    121 
    122             self.dbenvMaster.rep_set_nsites(2)
    123             self.dbenvClient.rep_set_nsites(2)
    124 
    125         self.dbenvMaster.rep_set_priority(10)
    126         self.dbenvClient.rep_set_priority(0)
    127 
    128         self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
    129         self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
    130         self.assertEqual(self.dbenvMaster.rep_get_timeout(
    131             db.DB_REP_CONNECTION_RETRY), 100123)
    132         self.assertEqual(self.dbenvClient.rep_get_timeout(
    133             db.DB_REP_CONNECTION_RETRY), 100321)
    134 
    135         self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
    136         self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
    137         self.assertEqual(self.dbenvMaster.rep_get_timeout(
    138             db.DB_REP_ELECTION_TIMEOUT), 100234)
    139         self.assertEqual(self.dbenvClient.rep_get_timeout(
    140             db.DB_REP_ELECTION_TIMEOUT), 100432)
    141 
    142         self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
    143         self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
    144         self.assertEqual(self.dbenvMaster.rep_get_timeout(
    145             db.DB_REP_ELECTION_RETRY), 100345)
    146         self.assertEqual(self.dbenvClient.rep_get_timeout(
    147             db.DB_REP_ELECTION_RETRY), 100543)
    148 
    149         self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
    150         self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
    151 
    152         self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER);
    153         self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT);
    154 
    155         self.assertEqual(self.dbenvMaster.rep_get_nsites(),2)
    156         self.assertEqual(self.dbenvClient.rep_get_nsites(),2)
    157         self.assertEqual(self.dbenvMaster.rep_get_priority(),10)
    158         self.assertEqual(self.dbenvClient.rep_get_priority(),0)
    159         self.assertEqual(self.dbenvMaster.repmgr_get_ack_policy(),
    160                 db.DB_REPMGR_ACKS_ALL)
    161         self.assertEqual(self.dbenvClient.repmgr_get_ack_policy(),
    162                 db.DB_REPMGR_ACKS_ALL)
    163 
    164         # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
    165         # is not generated if the master has no new transactions.
    166         # This is solved in BDB 4.6 (#15542).
    167         import time
    168         timeout = time.time()+60
    169         while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
    170             time.sleep(0.02)
    171         # self.client_startupdone does not always get set to True within
    172         # the timeout.  On windows this may be a deep issue, on other
    173         # platforms it is likely just a timing issue, especially on slow
    174         # virthost buildbots (see issue 3892 for more).  Even though
    175         # the timeout triggers, the rest of this test method usually passes
    176         # (but not all of it always, see below).  So we just note the
    177         # timeout on stderr and keep soldering on.
    178         if time.time()>timeout:
    179             import sys
    180             print >> sys.stderr, ("XXX: timeout happened before"
    181                 "startup was confirmed - see issue 3892")
    182             startup_timeout = True
    183 
    184         d = self.dbenvMaster.repmgr_site_list()
    185         self.assertEqual(len(d), 1)
    186         d = d.values()[0]  # There is only one
    187         self.assertEqual(d[0], "127.0.0.1")
    188         self.assertEqual(d[1], client_port)
    189         self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \
    190                 (d[2]==db.DB_REPMGR_DISCONNECTED))
    191 
    192         d = self.dbenvClient.repmgr_site_list()
    193         self.assertEqual(len(d), 1)
    194         d = d.values()[0]  # There is only one
    195         self.assertEqual(d[0], "127.0.0.1")
    196         self.assertEqual(d[1], master_port)
    197         self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \
    198                 (d[2]==db.DB_REPMGR_DISCONNECTED))
    199 
    200         if db.version() >= (4,6) :
    201             d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
    202             self.assertTrue("msgs_queued" in d)
    203 
    204         self.dbMaster=db.DB(self.dbenvMaster)
    205         txn=self.dbenvMaster.txn_begin()
    206         self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
    207         txn.commit()
    208 
    209         import time,os.path
    210         timeout=time.time()+10
    211         while (time.time()<timeout) and \
    212           not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
    213             time.sleep(0.01)
    214 
    215         self.dbClient=db.DB(self.dbenvClient)
    216         while True :
    217             txn=self.dbenvClient.txn_begin()
    218             try :
    219                 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
    220                         mode=0666, txn=txn)
    221             except db.DBRepHandleDeadError :
    222                 txn.abort()
    223                 self.dbClient.close()
    224                 self.dbClient=db.DB(self.dbenvClient)
    225                 continue
    226 
    227             txn.commit()
    228             break
    229 
    230         txn=self.dbenvMaster.txn_begin()
    231         self.dbMaster.put("ABC", "123", txn=txn)
    232         txn.commit()
    233         import time
    234         timeout=time.time()+10
    235         v=None
    236         while (time.time()<timeout) and (v is None) :
    237             txn=self.dbenvClient.txn_begin()
    238             v=self.dbClient.get("ABC", txn=txn)
    239             txn.commit()
    240             if v is None :
    241                 time.sleep(0.02)
    242         # If startup did not happen before the timeout above, then this test
    243         # sometimes fails.  This happens randomly, which causes buildbot
    244         # instability, but all the other bsddb tests pass.  Since bsddb3 in the
    245         # stdlib is currently not getting active maintenance, and is gone in
    246         # py3k, we just skip the end of the test in that case.
    247         if time.time()>=timeout and startup_timeout:
    248             self.skipTest("replication test skipped due to random failure, "
    249                 "see issue 3892")
    250         self.assertTrue(time.time()<timeout)
    251         self.assertEqual("123", v)
    252 
    253         txn=self.dbenvMaster.txn_begin()
    254         self.dbMaster.delete("ABC", txn=txn)
    255         txn.commit()
    256         timeout=time.time()+10
    257         while (time.time()<timeout) and (v is not None) :
    258             txn=self.dbenvClient.txn_begin()
    259             v=self.dbClient.get("ABC", txn=txn)
    260             txn.commit()
    261             if v is None :
    262                 time.sleep(0.02)
    263         self.assertTrue(time.time()<timeout)
    264         self.assertEqual(None, v)
    265 
    266 class DBBaseReplication(DBReplication) :
    267     def setUp(self) :
    268         DBReplication.setUp(self)
    269         def confirmed_master(a,b,c) :
    270             if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
    271                 self.confirmed_master = True
    272 
    273         def client_startupdone(a,b,c) :
    274             if b == db.DB_EVENT_REP_STARTUPDONE :
    275                 self.client_startupdone = True
    276 
    277         self.dbenvMaster.set_event_notify(confirmed_master)
    278         self.dbenvClient.set_event_notify(client_startupdone)
    279 
    280         import Queue
    281         self.m2c = Queue.Queue()
    282         self.c2m = Queue.Queue()
    283 
    284         # There are only two nodes, so we don't need to
    285         # do any routing decision
    286         def m2c(dbenv, control, rec, lsnp, envid, flags) :
    287             self.m2c.put((control, rec))
    288 
    289         def c2m(dbenv, control, rec, lsnp, envid, flags) :
    290             self.c2m.put((control, rec))
    291 
    292         self.dbenvMaster.rep_set_transport(13,m2c)
    293         self.dbenvMaster.rep_set_priority(10)
    294         self.dbenvClient.rep_set_transport(3,c2m)
    295         self.dbenvClient.rep_set_priority(0)
    296 
    297         self.assertEqual(self.dbenvMaster.rep_get_priority(),10)
    298         self.assertEqual(self.dbenvClient.rep_get_priority(),0)
    299 
    300         #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
    301         #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
    302         #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
    303         #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
    304 
    305         def thread_master() :
    306             return self.thread_do(self.dbenvMaster, self.c2m, 3,
    307                     self.master_doing_election, True)
    308 
    309         def thread_client() :
    310             return self.thread_do(self.dbenvClient, self.m2c, 13,
    311                     self.client_doing_election, False)
    312 
    313         from threading import Thread
    314         t_m=Thread(target=thread_master)
    315         t_c=Thread(target=thread_client)
    316         import sys
    317         if sys.version_info[0] < 3 :
    318             t_m.setDaemon(True)
    319             t_c.setDaemon(True)
    320         else :
    321             t_m.daemon = True
    322             t_c.daemon = True
    323 
    324         self.t_m = t_m
    325         self.t_c = t_c
    326 
    327         self.dbMaster = self.dbClient = None
    328 
    329         self.master_doing_election=[False]
    330         self.client_doing_election=[False]
    331 
    332 
    333     def tearDown(self):
    334         if self.dbClient :
    335             self.dbClient.close()
    336         if self.dbMaster :
    337             self.dbMaster.close()
    338         self.m2c.put(None)
    339         self.c2m.put(None)
    340         self.t_m.join()
    341         self.t_c.join()
    342 
    343         # Here we assign dummy event handlers to allow GC of the test object.
    344         # Since the dummy handler doesn't use any outer scope variable, it
    345         # doesn't keep any reference to the test object.
    346         def dummy(*args) :
    347             pass
    348         self.dbenvMaster.set_event_notify(dummy)
    349         self.dbenvClient.set_event_notify(dummy)
    350         self.dbenvMaster.rep_set_transport(13,dummy)
    351         self.dbenvClient.rep_set_transport(3,dummy)
    352 
    353         self.dbenvClient.close()
    354         self.dbenvMaster.close()
    355         test_support.rmtree(self.homeDirClient)
    356         test_support.rmtree(self.homeDirMaster)
    357 
    358     def basic_rep_threading(self) :
    359         self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
    360         self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
    361 
    362         def thread_do(env, q, envid, election_status, must_be_master) :
    363             while True :
    364                 v=q.get()
    365                 if v is None : return
    366                 env.rep_process_message(v[0], v[1], envid)
    367 
    368         self.thread_do = thread_do
    369 
    370         self.t_m.start()
    371         self.t_c.start()
    372 
    373     def test01_basic_replication(self) :
    374         self.basic_rep_threading()
    375 
    376         # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
    377         # is not generated if the master has no new transactions.
    378         # This is solved in BDB 4.6 (#15542).
    379         import time
    380         timeout = time.time()+60
    381         while (time.time()<timeout) and not (self.confirmed_master and
    382                 self.client_startupdone) :
    383             time.sleep(0.02)
    384         self.assertTrue(time.time()<timeout)
    385 
    386         self.dbMaster=db.DB(self.dbenvMaster)
    387         txn=self.dbenvMaster.txn_begin()
    388         self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
    389         txn.commit()
    390 
    391         import time,os.path
    392         timeout=time.time()+10
    393         while (time.time()<timeout) and \
    394           not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
    395             time.sleep(0.01)
    396 
    397         self.dbClient=db.DB(self.dbenvClient)
    398         while True :
    399             txn=self.dbenvClient.txn_begin()
    400             try :
    401                 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
    402                         mode=0666, txn=txn)
    403             except db.DBRepHandleDeadError :
    404                 txn.abort()
    405                 self.dbClient.close()
    406                 self.dbClient=db.DB(self.dbenvClient)
    407                 continue
    408 
    409             txn.commit()
    410             break
    411 
    412         d = self.dbenvMaster.rep_stat(flags=db.DB_STAT_CLEAR);
    413         self.assertTrue("master_changes" in d)
    414 
    415         txn=self.dbenvMaster.txn_begin()
    416         self.dbMaster.put("ABC", "123", txn=txn)
    417         txn.commit()
    418         import time
    419         timeout=time.time()+10
    420         v=None
    421         while (time.time()<timeout) and (v is None) :
    422             txn=self.dbenvClient.txn_begin()
    423             v=self.dbClient.get("ABC", txn=txn)
    424             txn.commit()
    425             if v is None :
    426                 time.sleep(0.02)
    427         self.assertTrue(time.time()<timeout)
    428         self.assertEqual("123", v)
    429 
    430         txn=self.dbenvMaster.txn_begin()
    431         self.dbMaster.delete("ABC", txn=txn)
    432         txn.commit()
    433         timeout=time.time()+10
    434         while (time.time()<timeout) and (v is not None) :
    435             txn=self.dbenvClient.txn_begin()
    436             v=self.dbClient.get("ABC", txn=txn)
    437             txn.commit()
    438             if v is None :
    439                 time.sleep(0.02)
    440         self.assertTrue(time.time()<timeout)
    441         self.assertEqual(None, v)
    442 
    443     if db.version() >= (4,7) :
    444         def test02_test_request(self) :
    445             self.basic_rep_threading()
    446             (minimum, maximum) = self.dbenvClient.rep_get_request()
    447             self.dbenvClient.rep_set_request(minimum-1, maximum+1)
    448             self.assertEqual(self.dbenvClient.rep_get_request(),
    449                     (minimum-1, maximum+1))
    450 
    451     if db.version() >= (4,6) :
    452         def test03_master_election(self) :
    453             # Get ready to hold an election
    454             #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
    455             self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
    456             self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
    457 
    458             def thread_do(env, q, envid, election_status, must_be_master) :
    459                 while True :
    460                     v=q.get()
    461                     if v is None : return
    462                     r = env.rep_process_message(v[0],v[1],envid)
    463                     if must_be_master and self.confirmed_master :
    464                         self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
    465                         must_be_master = False
    466 
    467                     if r[0] == db.DB_REP_HOLDELECTION :
    468                         def elect() :
    469                             while True :
    470                                 try :
    471                                     env.rep_elect(2, 1)
    472                                     election_status[0] = False
    473                                     break
    474                                 except db.DBRepUnavailError :
    475                                     pass
    476                         if not election_status[0] and not self.confirmed_master :
    477                             from threading import Thread
    478                             election_status[0] = True
    479                             t=Thread(target=elect)
    480                             import sys
    481                             if sys.version_info[0] < 3 :
    482                                 t.setDaemon(True)
    483                             else :
    484                                 t.daemon = True
    485                             t.start()
    486 
    487             self.thread_do = thread_do
    488 
    489             self.t_m.start()
    490             self.t_c.start()
    491 
    492             self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
    493             self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
    494             self.client_doing_election[0] = True
    495             while True :
    496                 try :
    497                     self.dbenvClient.rep_elect(2, 1)
    498                     self.client_doing_election[0] = False
    499                     break
    500                 except db.DBRepUnavailError :
    501                     pass
    502 
    503             self.assertTrue(self.confirmed_master)
    504 
    505             # Race condition showed up after upgrading to Solaris 10 Update 10
    506             # https://forums.oracle.com/forums/thread.jspa?messageID=9902860
    507             # jcea (at] jcea.es: See private email from Paula Bingham (Oracle),
    508             # in 20110929.
    509             while not (self.dbenvClient.rep_stat()["startup_complete"]) :
    510                 pass
    511 
    512     if db.version() >= (4,7) :
    513         def test04_test_clockskew(self) :
    514             fast, slow = 1234, 1230
    515             self.dbenvMaster.rep_set_clockskew(fast, slow)
    516             self.assertEqual((fast, slow),
    517                     self.dbenvMaster.rep_get_clockskew())
    518             self.basic_rep_threading()
    519 
    520 #----------------------------------------------------------------------
    521 
    522 def test_suite():
    523     suite = unittest.TestSuite()
    524     if db.version() >= (4, 6) :
    525         dbenv = db.DBEnv()
    526         try :
    527             dbenv.repmgr_get_ack_policy()
    528             ReplicationManager_available=True
    529         except :
    530             ReplicationManager_available=False
    531         dbenv.close()
    532         del dbenv
    533         if ReplicationManager_available :
    534             suite.addTest(unittest.makeSuite(DBReplicationManager))
    535 
    536         if have_threads :
    537             suite.addTest(unittest.makeSuite(DBBaseReplication))
    538 
    539     return suite
    540 
    541 
    542 if __name__ == '__main__':
    543     unittest.main(defaultTest='test_suite')
    544