1 """TestCases for distributed transactions. 2 """ 3 4 import os 5 import time 6 import unittest 7 8 from test_all import db, test_support, have_threads, verbose, \ 9 get_new_environment_path, get_new_database_path 10 11 12 #---------------------------------------------------------------------- 13 14 class DBReplication(unittest.TestCase) : 15 def setUp(self) : 16 self.homeDirMaster = get_new_environment_path() 17 self.homeDirClient = get_new_environment_path() 18 19 self.dbenvMaster = db.DBEnv() 20 self.dbenvClient = db.DBEnv() 21 22 # Must use "DB_THREAD" because the Replication Manager will 23 # be executed in other threads but will use the same environment. 24 # http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0 25 self.dbenvMaster.open(self.homeDirMaster, db.DB_CREATE | db.DB_INIT_TXN 26 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | 27 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666) 28 self.dbenvClient.open(self.homeDirClient, db.DB_CREATE | db.DB_INIT_TXN 29 | db.DB_INIT_LOG | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | 30 db.DB_INIT_REP | db.DB_RECOVER | db.DB_THREAD, 0666) 31 32 self.confirmed_master=self.client_startupdone=False 33 def confirmed_master(a,b,c) : 34 if b==db.DB_EVENT_REP_MASTER : 35 self.confirmed_master=True 36 37 def client_startupdone(a,b,c) : 38 if b==db.DB_EVENT_REP_STARTUPDONE : 39 self.client_startupdone=True 40 41 self.dbenvMaster.set_event_notify(confirmed_master) 42 self.dbenvClient.set_event_notify(client_startupdone) 43 44 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) 45 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) 46 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) 47 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) 48 49 self.dbMaster = self.dbClient = None 50 51 52 def tearDown(self): 53 if self.dbClient : 54 self.dbClient.close() 55 if self.dbMaster : 56 self.dbMaster.close() 57 58 # Here we assign dummy event handlers to allow GC of the test object. 59 # Since the dummy handler doesn't use any outer scope variable, it 60 # doesn't keep any reference to the test object. 61 def dummy(*args) : 62 pass 63 self.dbenvMaster.set_event_notify(dummy) 64 self.dbenvClient.set_event_notify(dummy) 65 66 self.dbenvClient.close() 67 self.dbenvMaster.close() 68 test_support.rmtree(self.homeDirClient) 69 test_support.rmtree(self.homeDirMaster) 70 71 class DBReplicationManager(DBReplication) : 72 def test01_basic_replication(self) : 73 master_port = test_support.find_unused_port() 74 client_port = test_support.find_unused_port() 75 if db.version() >= (5, 2) : 76 self.site = self.dbenvMaster.repmgr_site("127.0.0.1", master_port) 77 self.site.set_config(db.DB_GROUP_CREATOR, True) 78 self.site.set_config(db.DB_LOCAL_SITE, True) 79 self.site2 = self.dbenvMaster.repmgr_site("127.0.0.1", client_port) 80 81 self.site3 = self.dbenvClient.repmgr_site("127.0.0.1", master_port) 82 self.site3.set_config(db.DB_BOOTSTRAP_HELPER, True) 83 self.site4 = self.dbenvClient.repmgr_site("127.0.0.1", client_port) 84 self.site4.set_config(db.DB_LOCAL_SITE, True) 85 86 d = { 87 db.DB_BOOTSTRAP_HELPER: [False, False, True, False], 88 db.DB_GROUP_CREATOR: [True, False, False, False], 89 db.DB_LEGACY: [False, False, False, False], 90 db.DB_LOCAL_SITE: [True, False, False, True], 91 db.DB_REPMGR_PEER: [False, False, False, False ], 92 } 93 94 for i, j in d.items() : 95 for k, v in \ 96 zip([self.site, self.site2, self.site3, self.site4], j) : 97 if v : 98 self.assertTrue(k.get_config(i)) 99 else : 100 self.assertFalse(k.get_config(i)) 101 102 self.assertNotEqual(self.site.get_eid(), self.site2.get_eid()) 103 self.assertNotEqual(self.site3.get_eid(), self.site4.get_eid()) 104 105 for i, j in zip([self.site, self.site2, self.site3, self.site4], \ 106 [master_port, client_port, master_port, client_port]) : 107 addr = i.get_address() 108 self.assertEqual(addr, ("127.0.0.1", j)) 109 110 for i in [self.site, self.site2] : 111 self.assertEqual(i.get_address(), 112 self.dbenvMaster.repmgr_site_by_eid(i.get_eid()).get_address()) 113 for i in [self.site3, self.site4] : 114 self.assertEqual(i.get_address(), 115 self.dbenvClient.repmgr_site_by_eid(i.get_eid()).get_address()) 116 else : 117 self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port) 118 self.dbenvClient.repmgr_set_local_site("127.0.0.1", client_port) 119 self.dbenvMaster.repmgr_add_remote_site("127.0.0.1", client_port) 120 self.dbenvClient.repmgr_add_remote_site("127.0.0.1", master_port) 121 122 self.dbenvMaster.rep_set_nsites(2) 123 self.dbenvClient.rep_set_nsites(2) 124 125 self.dbenvMaster.rep_set_priority(10) 126 self.dbenvClient.rep_set_priority(0) 127 128 self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123) 129 self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321) 130 self.assertEqual(self.dbenvMaster.rep_get_timeout( 131 db.DB_REP_CONNECTION_RETRY), 100123) 132 self.assertEqual(self.dbenvClient.rep_get_timeout( 133 db.DB_REP_CONNECTION_RETRY), 100321) 134 135 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234) 136 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432) 137 self.assertEqual(self.dbenvMaster.rep_get_timeout( 138 db.DB_REP_ELECTION_TIMEOUT), 100234) 139 self.assertEqual(self.dbenvClient.rep_get_timeout( 140 db.DB_REP_ELECTION_TIMEOUT), 100432) 141 142 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345) 143 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543) 144 self.assertEqual(self.dbenvMaster.rep_get_timeout( 145 db.DB_REP_ELECTION_RETRY), 100345) 146 self.assertEqual(self.dbenvClient.rep_get_timeout( 147 db.DB_REP_ELECTION_RETRY), 100543) 148 149 self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL) 150 self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL) 151 152 self.dbenvMaster.repmgr_start(1, db.DB_REP_MASTER); 153 self.dbenvClient.repmgr_start(1, db.DB_REP_CLIENT); 154 155 self.assertEqual(self.dbenvMaster.rep_get_nsites(),2) 156 self.assertEqual(self.dbenvClient.rep_get_nsites(),2) 157 self.assertEqual(self.dbenvMaster.rep_get_priority(),10) 158 self.assertEqual(self.dbenvClient.rep_get_priority(),0) 159 self.assertEqual(self.dbenvMaster.repmgr_get_ack_policy(), 160 db.DB_REPMGR_ACKS_ALL) 161 self.assertEqual(self.dbenvClient.repmgr_get_ack_policy(), 162 db.DB_REPMGR_ACKS_ALL) 163 164 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE 165 # is not generated if the master has no new transactions. 166 # This is solved in BDB 4.6 (#15542). 167 import time 168 timeout = time.time()+60 169 while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) : 170 time.sleep(0.02) 171 # self.client_startupdone does not always get set to True within 172 # the timeout. On windows this may be a deep issue, on other 173 # platforms it is likely just a timing issue, especially on slow 174 # virthost buildbots (see issue 3892 for more). Even though 175 # the timeout triggers, the rest of this test method usually passes 176 # (but not all of it always, see below). So we just note the 177 # timeout on stderr and keep soldering on. 178 if time.time()>timeout: 179 import sys 180 print >> sys.stderr, ("XXX: timeout happened before" 181 "startup was confirmed - see issue 3892") 182 startup_timeout = True 183 184 d = self.dbenvMaster.repmgr_site_list() 185 self.assertEqual(len(d), 1) 186 d = d.values()[0] # There is only one 187 self.assertEqual(d[0], "127.0.0.1") 188 self.assertEqual(d[1], client_port) 189 self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \ 190 (d[2]==db.DB_REPMGR_DISCONNECTED)) 191 192 d = self.dbenvClient.repmgr_site_list() 193 self.assertEqual(len(d), 1) 194 d = d.values()[0] # There is only one 195 self.assertEqual(d[0], "127.0.0.1") 196 self.assertEqual(d[1], master_port) 197 self.assertTrue((d[2]==db.DB_REPMGR_CONNECTED) or \ 198 (d[2]==db.DB_REPMGR_DISCONNECTED)) 199 200 if db.version() >= (4,6) : 201 d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR); 202 self.assertTrue("msgs_queued" in d) 203 204 self.dbMaster=db.DB(self.dbenvMaster) 205 txn=self.dbenvMaster.txn_begin() 206 self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn) 207 txn.commit() 208 209 import time,os.path 210 timeout=time.time()+10 211 while (time.time()<timeout) and \ 212 not (os.path.exists(os.path.join(self.homeDirClient,"test"))) : 213 time.sleep(0.01) 214 215 self.dbClient=db.DB(self.dbenvClient) 216 while True : 217 txn=self.dbenvClient.txn_begin() 218 try : 219 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY, 220 mode=0666, txn=txn) 221 except db.DBRepHandleDeadError : 222 txn.abort() 223 self.dbClient.close() 224 self.dbClient=db.DB(self.dbenvClient) 225 continue 226 227 txn.commit() 228 break 229 230 txn=self.dbenvMaster.txn_begin() 231 self.dbMaster.put("ABC", "123", txn=txn) 232 txn.commit() 233 import time 234 timeout=time.time()+10 235 v=None 236 while (time.time()<timeout) and (v is None) : 237 txn=self.dbenvClient.txn_begin() 238 v=self.dbClient.get("ABC", txn=txn) 239 txn.commit() 240 if v is None : 241 time.sleep(0.02) 242 # If startup did not happen before the timeout above, then this test 243 # sometimes fails. This happens randomly, which causes buildbot 244 # instability, but all the other bsddb tests pass. Since bsddb3 in the 245 # stdlib is currently not getting active maintenance, and is gone in 246 # py3k, we just skip the end of the test in that case. 247 if time.time()>=timeout and startup_timeout: 248 self.skipTest("replication test skipped due to random failure, " 249 "see issue 3892") 250 self.assertTrue(time.time()<timeout) 251 self.assertEqual("123", v) 252 253 txn=self.dbenvMaster.txn_begin() 254 self.dbMaster.delete("ABC", txn=txn) 255 txn.commit() 256 timeout=time.time()+10 257 while (time.time()<timeout) and (v is not None) : 258 txn=self.dbenvClient.txn_begin() 259 v=self.dbClient.get("ABC", txn=txn) 260 txn.commit() 261 if v is None : 262 time.sleep(0.02) 263 self.assertTrue(time.time()<timeout) 264 self.assertEqual(None, v) 265 266 class DBBaseReplication(DBReplication) : 267 def setUp(self) : 268 DBReplication.setUp(self) 269 def confirmed_master(a,b,c) : 270 if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) : 271 self.confirmed_master = True 272 273 def client_startupdone(a,b,c) : 274 if b == db.DB_EVENT_REP_STARTUPDONE : 275 self.client_startupdone = True 276 277 self.dbenvMaster.set_event_notify(confirmed_master) 278 self.dbenvClient.set_event_notify(client_startupdone) 279 280 import Queue 281 self.m2c = Queue.Queue() 282 self.c2m = Queue.Queue() 283 284 # There are only two nodes, so we don't need to 285 # do any routing decision 286 def m2c(dbenv, control, rec, lsnp, envid, flags) : 287 self.m2c.put((control, rec)) 288 289 def c2m(dbenv, control, rec, lsnp, envid, flags) : 290 self.c2m.put((control, rec)) 291 292 self.dbenvMaster.rep_set_transport(13,m2c) 293 self.dbenvMaster.rep_set_priority(10) 294 self.dbenvClient.rep_set_transport(3,c2m) 295 self.dbenvClient.rep_set_priority(0) 296 297 self.assertEqual(self.dbenvMaster.rep_get_priority(),10) 298 self.assertEqual(self.dbenvClient.rep_get_priority(),0) 299 300 #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) 301 #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) 302 #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) 303 #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) 304 305 def thread_master() : 306 return self.thread_do(self.dbenvMaster, self.c2m, 3, 307 self.master_doing_election, True) 308 309 def thread_client() : 310 return self.thread_do(self.dbenvClient, self.m2c, 13, 311 self.client_doing_election, False) 312 313 from threading import Thread 314 t_m=Thread(target=thread_master) 315 t_c=Thread(target=thread_client) 316 import sys 317 if sys.version_info[0] < 3 : 318 t_m.setDaemon(True) 319 t_c.setDaemon(True) 320 else : 321 t_m.daemon = True 322 t_c.daemon = True 323 324 self.t_m = t_m 325 self.t_c = t_c 326 327 self.dbMaster = self.dbClient = None 328 329 self.master_doing_election=[False] 330 self.client_doing_election=[False] 331 332 333 def tearDown(self): 334 if self.dbClient : 335 self.dbClient.close() 336 if self.dbMaster : 337 self.dbMaster.close() 338 self.m2c.put(None) 339 self.c2m.put(None) 340 self.t_m.join() 341 self.t_c.join() 342 343 # Here we assign dummy event handlers to allow GC of the test object. 344 # Since the dummy handler doesn't use any outer scope variable, it 345 # doesn't keep any reference to the test object. 346 def dummy(*args) : 347 pass 348 self.dbenvMaster.set_event_notify(dummy) 349 self.dbenvClient.set_event_notify(dummy) 350 self.dbenvMaster.rep_set_transport(13,dummy) 351 self.dbenvClient.rep_set_transport(3,dummy) 352 353 self.dbenvClient.close() 354 self.dbenvMaster.close() 355 test_support.rmtree(self.homeDirClient) 356 test_support.rmtree(self.homeDirMaster) 357 358 def basic_rep_threading(self) : 359 self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER) 360 self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT) 361 362 def thread_do(env, q, envid, election_status, must_be_master) : 363 while True : 364 v=q.get() 365 if v is None : return 366 env.rep_process_message(v[0], v[1], envid) 367 368 self.thread_do = thread_do 369 370 self.t_m.start() 371 self.t_c.start() 372 373 def test01_basic_replication(self) : 374 self.basic_rep_threading() 375 376 # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE 377 # is not generated if the master has no new transactions. 378 # This is solved in BDB 4.6 (#15542). 379 import time 380 timeout = time.time()+60 381 while (time.time()<timeout) and not (self.confirmed_master and 382 self.client_startupdone) : 383 time.sleep(0.02) 384 self.assertTrue(time.time()<timeout) 385 386 self.dbMaster=db.DB(self.dbenvMaster) 387 txn=self.dbenvMaster.txn_begin() 388 self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn) 389 txn.commit() 390 391 import time,os.path 392 timeout=time.time()+10 393 while (time.time()<timeout) and \ 394 not (os.path.exists(os.path.join(self.homeDirClient,"test"))) : 395 time.sleep(0.01) 396 397 self.dbClient=db.DB(self.dbenvClient) 398 while True : 399 txn=self.dbenvClient.txn_begin() 400 try : 401 self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY, 402 mode=0666, txn=txn) 403 except db.DBRepHandleDeadError : 404 txn.abort() 405 self.dbClient.close() 406 self.dbClient=db.DB(self.dbenvClient) 407 continue 408 409 txn.commit() 410 break 411 412 d = self.dbenvMaster.rep_stat(flags=db.DB_STAT_CLEAR); 413 self.assertTrue("master_changes" in d) 414 415 txn=self.dbenvMaster.txn_begin() 416 self.dbMaster.put("ABC", "123", txn=txn) 417 txn.commit() 418 import time 419 timeout=time.time()+10 420 v=None 421 while (time.time()<timeout) and (v is None) : 422 txn=self.dbenvClient.txn_begin() 423 v=self.dbClient.get("ABC", txn=txn) 424 txn.commit() 425 if v is None : 426 time.sleep(0.02) 427 self.assertTrue(time.time()<timeout) 428 self.assertEqual("123", v) 429 430 txn=self.dbenvMaster.txn_begin() 431 self.dbMaster.delete("ABC", txn=txn) 432 txn.commit() 433 timeout=time.time()+10 434 while (time.time()<timeout) and (v is not None) : 435 txn=self.dbenvClient.txn_begin() 436 v=self.dbClient.get("ABC", txn=txn) 437 txn.commit() 438 if v is None : 439 time.sleep(0.02) 440 self.assertTrue(time.time()<timeout) 441 self.assertEqual(None, v) 442 443 if db.version() >= (4,7) : 444 def test02_test_request(self) : 445 self.basic_rep_threading() 446 (minimum, maximum) = self.dbenvClient.rep_get_request() 447 self.dbenvClient.rep_set_request(minimum-1, maximum+1) 448 self.assertEqual(self.dbenvClient.rep_get_request(), 449 (minimum-1, maximum+1)) 450 451 if db.version() >= (4,6) : 452 def test03_master_election(self) : 453 # Get ready to hold an election 454 #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER) 455 self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT) 456 self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT) 457 458 def thread_do(env, q, envid, election_status, must_be_master) : 459 while True : 460 v=q.get() 461 if v is None : return 462 r = env.rep_process_message(v[0],v[1],envid) 463 if must_be_master and self.confirmed_master : 464 self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER) 465 must_be_master = False 466 467 if r[0] == db.DB_REP_HOLDELECTION : 468 def elect() : 469 while True : 470 try : 471 env.rep_elect(2, 1) 472 election_status[0] = False 473 break 474 except db.DBRepUnavailError : 475 pass 476 if not election_status[0] and not self.confirmed_master : 477 from threading import Thread 478 election_status[0] = True 479 t=Thread(target=elect) 480 import sys 481 if sys.version_info[0] < 3 : 482 t.setDaemon(True) 483 else : 484 t.daemon = True 485 t.start() 486 487 self.thread_do = thread_do 488 489 self.t_m.start() 490 self.t_c.start() 491 492 self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000) 493 self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000) 494 self.client_doing_election[0] = True 495 while True : 496 try : 497 self.dbenvClient.rep_elect(2, 1) 498 self.client_doing_election[0] = False 499 break 500 except db.DBRepUnavailError : 501 pass 502 503 self.assertTrue(self.confirmed_master) 504 505 # Race condition showed up after upgrading to Solaris 10 Update 10 506 # https://forums.oracle.com/forums/thread.jspa?messageID=9902860 507 # jcea (at] jcea.es: See private email from Paula Bingham (Oracle), 508 # in 20110929. 509 while not (self.dbenvClient.rep_stat()["startup_complete"]) : 510 pass 511 512 if db.version() >= (4,7) : 513 def test04_test_clockskew(self) : 514 fast, slow = 1234, 1230 515 self.dbenvMaster.rep_set_clockskew(fast, slow) 516 self.assertEqual((fast, slow), 517 self.dbenvMaster.rep_get_clockskew()) 518 self.basic_rep_threading() 519 520 #---------------------------------------------------------------------- 521 522 def test_suite(): 523 suite = unittest.TestSuite() 524 if db.version() >= (4, 6) : 525 dbenv = db.DBEnv() 526 try : 527 dbenv.repmgr_get_ack_policy() 528 ReplicationManager_available=True 529 except : 530 ReplicationManager_available=False 531 dbenv.close() 532 del dbenv 533 if ReplicationManager_available : 534 suite.addTest(unittest.makeSuite(DBReplicationManager)) 535 536 if have_threads : 537 suite.addTest(unittest.makeSuite(DBBaseReplication)) 538 539 return suite 540 541 542 if __name__ == '__main__': 543 unittest.main(defaultTest='test_suite') 544