1 import gc 2 import sys 3 import unittest 4 import UserList 5 import weakref 6 import operator 7 8 from test import test_support 9 10 # Used in ReferencesTestCase.test_ref_created_during_del() . 11 ref_from_del = None 12 13 class C: 14 def method(self): 15 pass 16 17 18 class Callable: 19 bar = None 20 21 def __call__(self, x): 22 self.bar = x 23 24 25 def create_function(): 26 def f(): pass 27 return f 28 29 def create_bound_method(): 30 return C().method 31 32 def create_unbound_method(): 33 return C.method 34 35 36 class TestBase(unittest.TestCase): 37 38 def setUp(self): 39 self.cbcalled = 0 40 41 def callback(self, ref): 42 self.cbcalled += 1 43 44 45 class ReferencesTestCase(TestBase): 46 47 def test_basic_ref(self): 48 self.check_basic_ref(C) 49 self.check_basic_ref(create_function) 50 self.check_basic_ref(create_bound_method) 51 self.check_basic_ref(create_unbound_method) 52 53 # Just make sure the tp_repr handler doesn't raise an exception. 54 # Live reference: 55 o = C() 56 wr = weakref.ref(o) 57 repr(wr) 58 # Dead reference: 59 del o 60 repr(wr) 61 62 def test_basic_callback(self): 63 self.check_basic_callback(C) 64 self.check_basic_callback(create_function) 65 self.check_basic_callback(create_bound_method) 66 self.check_basic_callback(create_unbound_method) 67 68 def test_multiple_callbacks(self): 69 o = C() 70 ref1 = weakref.ref(o, self.callback) 71 ref2 = weakref.ref(o, self.callback) 72 del o 73 self.assertTrue(ref1() is None, 74 "expected reference to be invalidated") 75 self.assertTrue(ref2() is None, 76 "expected reference to be invalidated") 77 self.assertTrue(self.cbcalled == 2, 78 "callback not called the right number of times") 79 80 def test_multiple_selfref_callbacks(self): 81 # Make sure all references are invalidated before callbacks are called 82 # 83 # What's important here is that we're using the first 84 # reference in the callback invoked on the second reference 85 # (the most recently created ref is cleaned up first). This 86 # tests that all references to the object are invalidated 87 # before any of the callbacks are invoked, so that we only 88 # have one invocation of _weakref.c:cleanup_helper() active 89 # for a particular object at a time. 90 # 91 def callback(object, self=self): 92 self.ref() 93 c = C() 94 self.ref = weakref.ref(c, callback) 95 ref1 = weakref.ref(c, callback) 96 del c 97 98 def test_proxy_ref(self): 99 o = C() 100 o.bar = 1 101 ref1 = weakref.proxy(o, self.callback) 102 ref2 = weakref.proxy(o, self.callback) 103 del o 104 105 def check(proxy): 106 proxy.bar 107 108 self.assertRaises(weakref.ReferenceError, check, ref1) 109 self.assertRaises(weakref.ReferenceError, check, ref2) 110 self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C())) 111 self.assertTrue(self.cbcalled == 2) 112 113 def check_basic_ref(self, factory): 114 o = factory() 115 ref = weakref.ref(o) 116 self.assertTrue(ref() is not None, 117 "weak reference to live object should be live") 118 o2 = ref() 119 self.assertTrue(o is o2, 120 "<ref>() should return original object if live") 121 122 def check_basic_callback(self, factory): 123 self.cbcalled = 0 124 o = factory() 125 ref = weakref.ref(o, self.callback) 126 del o 127 self.assertTrue(self.cbcalled == 1, 128 "callback did not properly set 'cbcalled'") 129 self.assertTrue(ref() is None, 130 "ref2 should be dead after deleting object reference") 131 132 def test_ref_reuse(self): 133 o = C() 134 ref1 = weakref.ref(o) 135 # create a proxy to make sure that there's an intervening creation 136 # between these two; it should make no difference 137 proxy = weakref.proxy(o) 138 ref2 = weakref.ref(o) 139 self.assertTrue(ref1 is ref2, 140 "reference object w/out callback should be re-used") 141 142 o = C() 143 proxy = weakref.proxy(o) 144 ref1 = weakref.ref(o) 145 ref2 = weakref.ref(o) 146 self.assertTrue(ref1 is ref2, 147 "reference object w/out callback should be re-used") 148 self.assertTrue(weakref.getweakrefcount(o) == 2, 149 "wrong weak ref count for object") 150 del proxy 151 self.assertTrue(weakref.getweakrefcount(o) == 1, 152 "wrong weak ref count for object after deleting proxy") 153 154 def test_proxy_reuse(self): 155 o = C() 156 proxy1 = weakref.proxy(o) 157 ref = weakref.ref(o) 158 proxy2 = weakref.proxy(o) 159 self.assertTrue(proxy1 is proxy2, 160 "proxy object w/out callback should have been re-used") 161 162 def test_basic_proxy(self): 163 o = C() 164 self.check_proxy(o, weakref.proxy(o)) 165 166 L = UserList.UserList() 167 p = weakref.proxy(L) 168 self.assertFalse(p, "proxy for empty UserList should be false") 169 p.append(12) 170 self.assertEqual(len(L), 1) 171 self.assertTrue(p, "proxy for non-empty UserList should be true") 172 with test_support.check_py3k_warnings(): 173 p[:] = [2, 3] 174 self.assertEqual(len(L), 2) 175 self.assertEqual(len(p), 2) 176 self.assertIn(3, p, "proxy didn't support __contains__() properly") 177 p[1] = 5 178 self.assertEqual(L[1], 5) 179 self.assertEqual(p[1], 5) 180 L2 = UserList.UserList(L) 181 p2 = weakref.proxy(L2) 182 self.assertEqual(p, p2) 183 ## self.assertEqual(repr(L2), repr(p2)) 184 L3 = UserList.UserList(range(10)) 185 p3 = weakref.proxy(L3) 186 with test_support.check_py3k_warnings(): 187 self.assertEqual(L3[:], p3[:]) 188 self.assertEqual(L3[5:], p3[5:]) 189 self.assertEqual(L3[:5], p3[:5]) 190 self.assertEqual(L3[2:5], p3[2:5]) 191 192 def test_proxy_unicode(self): 193 # See bug 5037 194 class C(object): 195 def __str__(self): 196 return "string" 197 def __unicode__(self): 198 return u"unicode" 199 instance = C() 200 self.assertIn("__unicode__", dir(weakref.proxy(instance))) 201 self.assertEqual(unicode(weakref.proxy(instance)), u"unicode") 202 203 def test_proxy_index(self): 204 class C: 205 def __index__(self): 206 return 10 207 o = C() 208 p = weakref.proxy(o) 209 self.assertEqual(operator.index(p), 10) 210 211 def test_proxy_div(self): 212 class C: 213 def __floordiv__(self, other): 214 return 42 215 def __ifloordiv__(self, other): 216 return 21 217 o = C() 218 p = weakref.proxy(o) 219 self.assertEqual(p // 5, 42) 220 p //= 5 221 self.assertEqual(p, 21) 222 223 # The PyWeakref_* C API is documented as allowing either NULL or 224 # None as the value for the callback, where either means "no 225 # callback". The "no callback" ref and proxy objects are supposed 226 # to be shared so long as they exist by all callers so long as 227 # they are active. In Python 2.3.3 and earlier, this guarantee 228 # was not honored, and was broken in different ways for 229 # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.) 230 231 def test_shared_ref_without_callback(self): 232 self.check_shared_without_callback(weakref.ref) 233 234 def test_shared_proxy_without_callback(self): 235 self.check_shared_without_callback(weakref.proxy) 236 237 def check_shared_without_callback(self, makeref): 238 o = Object(1) 239 p1 = makeref(o, None) 240 p2 = makeref(o, None) 241 self.assertTrue(p1 is p2, "both callbacks were None in the C API") 242 del p1, p2 243 p1 = makeref(o) 244 p2 = makeref(o, None) 245 self.assertTrue(p1 is p2, "callbacks were NULL, None in the C API") 246 del p1, p2 247 p1 = makeref(o) 248 p2 = makeref(o) 249 self.assertTrue(p1 is p2, "both callbacks were NULL in the C API") 250 del p1, p2 251 p1 = makeref(o, None) 252 p2 = makeref(o) 253 self.assertTrue(p1 is p2, "callbacks were None, NULL in the C API") 254 255 def test_callable_proxy(self): 256 o = Callable() 257 ref1 = weakref.proxy(o) 258 259 self.check_proxy(o, ref1) 260 261 self.assertTrue(type(ref1) is weakref.CallableProxyType, 262 "proxy is not of callable type") 263 ref1('twinkies!') 264 self.assertTrue(o.bar == 'twinkies!', 265 "call through proxy not passed through to original") 266 ref1(x='Splat.') 267 self.assertTrue(o.bar == 'Splat.', 268 "call through proxy not passed through to original") 269 270 # expect due to too few args 271 self.assertRaises(TypeError, ref1) 272 273 # expect due to too many args 274 self.assertRaises(TypeError, ref1, 1, 2, 3) 275 276 def check_proxy(self, o, proxy): 277 o.foo = 1 278 self.assertTrue(proxy.foo == 1, 279 "proxy does not reflect attribute addition") 280 o.foo = 2 281 self.assertTrue(proxy.foo == 2, 282 "proxy does not reflect attribute modification") 283 del o.foo 284 self.assertTrue(not hasattr(proxy, 'foo'), 285 "proxy does not reflect attribute removal") 286 287 proxy.foo = 1 288 self.assertTrue(o.foo == 1, 289 "object does not reflect attribute addition via proxy") 290 proxy.foo = 2 291 self.assertTrue( 292 o.foo == 2, 293 "object does not reflect attribute modification via proxy") 294 del proxy.foo 295 self.assertTrue(not hasattr(o, 'foo'), 296 "object does not reflect attribute removal via proxy") 297 298 def test_proxy_deletion(self): 299 # Test clearing of SF bug #762891 300 class Foo: 301 result = None 302 def __delitem__(self, accessor): 303 self.result = accessor 304 g = Foo() 305 f = weakref.proxy(g) 306 del f[0] 307 self.assertEqual(f.result, 0) 308 309 def test_proxy_bool(self): 310 # Test clearing of SF bug #1170766 311 class List(list): pass 312 lyst = List() 313 self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst)) 314 315 def test_getweakrefcount(self): 316 o = C() 317 ref1 = weakref.ref(o) 318 ref2 = weakref.ref(o, self.callback) 319 self.assertTrue(weakref.getweakrefcount(o) == 2, 320 "got wrong number of weak reference objects") 321 322 proxy1 = weakref.proxy(o) 323 proxy2 = weakref.proxy(o, self.callback) 324 self.assertTrue(weakref.getweakrefcount(o) == 4, 325 "got wrong number of weak reference objects") 326 327 del ref1, ref2, proxy1, proxy2 328 self.assertTrue(weakref.getweakrefcount(o) == 0, 329 "weak reference objects not unlinked from" 330 " referent when discarded.") 331 332 # assumes ints do not support weakrefs 333 self.assertTrue(weakref.getweakrefcount(1) == 0, 334 "got wrong number of weak reference objects for int") 335 336 def test_getweakrefs(self): 337 o = C() 338 ref1 = weakref.ref(o, self.callback) 339 ref2 = weakref.ref(o, self.callback) 340 del ref1 341 self.assertTrue(weakref.getweakrefs(o) == [ref2], 342 "list of refs does not match") 343 344 o = C() 345 ref1 = weakref.ref(o, self.callback) 346 ref2 = weakref.ref(o, self.callback) 347 del ref2 348 self.assertTrue(weakref.getweakrefs(o) == [ref1], 349 "list of refs does not match") 350 351 del ref1 352 self.assertTrue(weakref.getweakrefs(o) == [], 353 "list of refs not cleared") 354 355 # assumes ints do not support weakrefs 356 self.assertTrue(weakref.getweakrefs(1) == [], 357 "list of refs does not match for int") 358 359 def test_newstyle_number_ops(self): 360 class F(float): 361 pass 362 f = F(2.0) 363 p = weakref.proxy(f) 364 self.assertTrue(p + 1.0 == 3.0) 365 self.assertTrue(1.0 + p == 3.0) # this used to SEGV 366 367 def test_callbacks_protected(self): 368 # Callbacks protected from already-set exceptions? 369 # Regression test for SF bug #478534. 370 class BogusError(Exception): 371 pass 372 data = {} 373 def remove(k): 374 del data[k] 375 def encapsulate(): 376 f = lambda : () 377 data[weakref.ref(f, remove)] = None 378 raise BogusError 379 try: 380 encapsulate() 381 except BogusError: 382 pass 383 else: 384 self.fail("exception not properly restored") 385 try: 386 encapsulate() 387 except BogusError: 388 pass 389 else: 390 self.fail("exception not properly restored") 391 392 def test_sf_bug_840829(self): 393 # "weakref callbacks and gc corrupt memory" 394 # subtype_dealloc erroneously exposed a new-style instance 395 # already in the process of getting deallocated to gc, 396 # causing double-deallocation if the instance had a weakref 397 # callback that triggered gc. 398 # If the bug exists, there probably won't be an obvious symptom 399 # in a release build. In a debug build, a segfault will occur 400 # when the second attempt to remove the instance from the "list 401 # of all objects" occurs. 402 403 import gc 404 405 class C(object): 406 pass 407 408 c = C() 409 wr = weakref.ref(c, lambda ignore: gc.collect()) 410 del c 411 412 # There endeth the first part. It gets worse. 413 del wr 414 415 c1 = C() 416 c1.i = C() 417 wr = weakref.ref(c1.i, lambda ignore: gc.collect()) 418 419 c2 = C() 420 c2.c1 = c1 421 del c1 # still alive because c2 points to it 422 423 # Now when subtype_dealloc gets called on c2, it's not enough just 424 # that c2 is immune from gc while the weakref callbacks associated 425 # with c2 execute (there are none in this 2nd half of the test, btw). 426 # subtype_dealloc goes on to call the base classes' deallocs too, 427 # so any gc triggered by weakref callbacks associated with anything 428 # torn down by a base class dealloc can also trigger double 429 # deallocation of c2. 430 del c2 431 432 def test_callback_in_cycle_1(self): 433 import gc 434 435 class J(object): 436 pass 437 438 class II(object): 439 def acallback(self, ignore): 440 self.J 441 442 I = II() 443 I.J = J 444 I.wr = weakref.ref(J, I.acallback) 445 446 # Now J and II are each in a self-cycle (as all new-style class 447 # objects are, since their __mro__ points back to them). I holds 448 # both a weak reference (I.wr) and a strong reference (I.J) to class 449 # J. I is also in a cycle (I.wr points to a weakref that references 450 # I.acallback). When we del these three, they all become trash, but 451 # the cycles prevent any of them from getting cleaned up immediately. 452 # Instead they have to wait for cyclic gc to deduce that they're 453 # trash. 454 # 455 # gc used to call tp_clear on all of them, and the order in which 456 # it does that is pretty accidental. The exact order in which we 457 # built up these things manages to provoke gc into running tp_clear 458 # in just the right order (I last). Calling tp_clear on II leaves 459 # behind an insane class object (its __mro__ becomes NULL). Calling 460 # tp_clear on J breaks its self-cycle, but J doesn't get deleted 461 # just then because of the strong reference from I.J. Calling 462 # tp_clear on I starts to clear I's __dict__, and just happens to 463 # clear I.J first -- I.wr is still intact. That removes the last 464 # reference to J, which triggers the weakref callback. The callback 465 # tries to do "self.J", and instances of new-style classes look up 466 # attributes ("J") in the class dict first. The class (II) wants to 467 # search II.__mro__, but that's NULL. The result was a segfault in 468 # a release build, and an assert failure in a debug build. 469 del I, J, II 470 gc.collect() 471 472 def test_callback_in_cycle_2(self): 473 import gc 474 475 # This is just like test_callback_in_cycle_1, except that II is an 476 # old-style class. The symptom is different then: an instance of an 477 # old-style class looks in its own __dict__ first. 'J' happens to 478 # get cleared from I.__dict__ before 'wr', and 'J' was never in II's 479 # __dict__, so the attribute isn't found. The difference is that 480 # the old-style II doesn't have a NULL __mro__ (it doesn't have any 481 # __mro__), so no segfault occurs. Instead it got: 482 # test_callback_in_cycle_2 (__main__.ReferencesTestCase) ... 483 # Exception exceptions.AttributeError: 484 # "II instance has no attribute 'J'" in <bound method II.acallback 485 # of <?.II instance at 0x00B9B4B8>> ignored 486 487 class J(object): 488 pass 489 490 class II: 491 def acallback(self, ignore): 492 self.J 493 494 I = II() 495 I.J = J 496 I.wr = weakref.ref(J, I.acallback) 497 498 del I, J, II 499 gc.collect() 500 501 def test_callback_in_cycle_3(self): 502 import gc 503 504 # This one broke the first patch that fixed the last two. In this 505 # case, the objects reachable from the callback aren't also reachable 506 # from the object (c1) *triggering* the callback: you can get to 507 # c1 from c2, but not vice-versa. The result was that c2's __dict__ 508 # got tp_clear'ed by the time the c2.cb callback got invoked. 509 510 class C: 511 def cb(self, ignore): 512 self.me 513 self.c1 514 self.wr 515 516 c1, c2 = C(), C() 517 518 c2.me = c2 519 c2.c1 = c1 520 c2.wr = weakref.ref(c1, c2.cb) 521 522 del c1, c2 523 gc.collect() 524 525 def test_callback_in_cycle_4(self): 526 import gc 527 528 # Like test_callback_in_cycle_3, except c2 and c1 have different 529 # classes. c2's class (C) isn't reachable from c1 then, so protecting 530 # objects reachable from the dying object (c1) isn't enough to stop 531 # c2's class (C) from getting tp_clear'ed before c2.cb is invoked. 532 # The result was a segfault (C.__mro__ was NULL when the callback 533 # tried to look up self.me). 534 535 class C(object): 536 def cb(self, ignore): 537 self.me 538 self.c1 539 self.wr 540 541 class D: 542 pass 543 544 c1, c2 = D(), C() 545 546 c2.me = c2 547 c2.c1 = c1 548 c2.wr = weakref.ref(c1, c2.cb) 549 550 del c1, c2, C, D 551 gc.collect() 552 553 def test_callback_in_cycle_resurrection(self): 554 import gc 555 556 # Do something nasty in a weakref callback: resurrect objects 557 # from dead cycles. For this to be attempted, the weakref and 558 # its callback must also be part of the cyclic trash (else the 559 # objects reachable via the callback couldn't be in cyclic trash 560 # to begin with -- the callback would act like an external root). 561 # But gc clears trash weakrefs with callbacks early now, which 562 # disables the callbacks, so the callbacks shouldn't get called 563 # at all (and so nothing actually gets resurrected). 564 565 alist = [] 566 class C(object): 567 def __init__(self, value): 568 self.attribute = value 569 570 def acallback(self, ignore): 571 alist.append(self.c) 572 573 c1, c2 = C(1), C(2) 574 c1.c = c2 575 c2.c = c1 576 c1.wr = weakref.ref(c2, c1.acallback) 577 c2.wr = weakref.ref(c1, c2.acallback) 578 579 def C_went_away(ignore): 580 alist.append("C went away") 581 wr = weakref.ref(C, C_went_away) 582 583 del c1, c2, C # make them all trash 584 self.assertEqual(alist, []) # del isn't enough to reclaim anything 585 586 gc.collect() 587 # c1.wr and c2.wr were part of the cyclic trash, so should have 588 # been cleared without their callbacks executing. OTOH, the weakref 589 # to C is bound to a function local (wr), and wasn't trash, so that 590 # callback should have been invoked when C went away. 591 self.assertEqual(alist, ["C went away"]) 592 # The remaining weakref should be dead now (its callback ran). 593 self.assertEqual(wr(), None) 594 595 del alist[:] 596 gc.collect() 597 self.assertEqual(alist, []) 598 599 def test_callbacks_on_callback(self): 600 import gc 601 602 # Set up weakref callbacks *on* weakref callbacks. 603 alist = [] 604 def safe_callback(ignore): 605 alist.append("safe_callback called") 606 607 class C(object): 608 def cb(self, ignore): 609 alist.append("cb called") 610 611 c, d = C(), C() 612 c.other = d 613 d.other = c 614 callback = c.cb 615 c.wr = weakref.ref(d, callback) # this won't trigger 616 d.wr = weakref.ref(callback, d.cb) # ditto 617 external_wr = weakref.ref(callback, safe_callback) # but this will 618 self.assertTrue(external_wr() is callback) 619 620 # The weakrefs attached to c and d should get cleared, so that 621 # C.cb is never called. But external_wr isn't part of the cyclic 622 # trash, and no cyclic trash is reachable from it, so safe_callback 623 # should get invoked when the bound method object callback (c.cb) 624 # -- which is itself a callback, and also part of the cyclic trash -- 625 # gets reclaimed at the end of gc. 626 627 del callback, c, d, C 628 self.assertEqual(alist, []) # del isn't enough to clean up cycles 629 gc.collect() 630 self.assertEqual(alist, ["safe_callback called"]) 631 self.assertEqual(external_wr(), None) 632 633 del alist[:] 634 gc.collect() 635 self.assertEqual(alist, []) 636 637 def test_gc_during_ref_creation(self): 638 self.check_gc_during_creation(weakref.ref) 639 640 def test_gc_during_proxy_creation(self): 641 self.check_gc_during_creation(weakref.proxy) 642 643 def check_gc_during_creation(self, makeref): 644 thresholds = gc.get_threshold() 645 gc.set_threshold(1, 1, 1) 646 gc.collect() 647 class A: 648 pass 649 650 def callback(*args): 651 pass 652 653 referenced = A() 654 655 a = A() 656 a.a = a 657 a.wr = makeref(referenced) 658 659 try: 660 # now make sure the object and the ref get labeled as 661 # cyclic trash: 662 a = A() 663 weakref.ref(referenced, callback) 664 665 finally: 666 gc.set_threshold(*thresholds) 667 668 def test_ref_created_during_del(self): 669 # Bug #1377858 670 # A weakref created in an object's __del__() would crash the 671 # interpreter when the weakref was cleaned up since it would refer to 672 # non-existent memory. This test should not segfault the interpreter. 673 class Target(object): 674 def __del__(self): 675 global ref_from_del 676 ref_from_del = weakref.ref(self) 677 678 w = Target() 679 680 def test_init(self): 681 # Issue 3634 682 # <weakref to class>.__init__() doesn't check errors correctly 683 r = weakref.ref(Exception) 684 self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0) 685 # No exception should be raised here 686 gc.collect() 687 688 def test_classes(self): 689 # Check that both old-style classes and new-style classes 690 # are weakrefable. 691 class A(object): 692 pass 693 class B: 694 pass 695 l = [] 696 weakref.ref(int) 697 a = weakref.ref(A, l.append) 698 A = None 699 gc.collect() 700 self.assertEqual(a(), None) 701 self.assertEqual(l, [a]) 702 b = weakref.ref(B, l.append) 703 B = None 704 gc.collect() 705 self.assertEqual(b(), None) 706 self.assertEqual(l, [a, b]) 707 708 709 class SubclassableWeakrefTestCase(TestBase): 710 711 def test_subclass_refs(self): 712 class MyRef(weakref.ref): 713 def __init__(self, ob, callback=None, value=42): 714 self.value = value 715 super(MyRef, self).__init__(ob, callback) 716 def __call__(self): 717 self.called = True 718 return super(MyRef, self).__call__() 719 o = Object("foo") 720 mr = MyRef(o, value=24) 721 self.assertTrue(mr() is o) 722 self.assertTrue(mr.called) 723 self.assertEqual(mr.value, 24) 724 del o 725 self.assertTrue(mr() is None) 726 self.assertTrue(mr.called) 727 728 def test_subclass_refs_dont_replace_standard_refs(self): 729 class MyRef(weakref.ref): 730 pass 731 o = Object(42) 732 r1 = MyRef(o) 733 r2 = weakref.ref(o) 734 self.assertTrue(r1 is not r2) 735 self.assertEqual(weakref.getweakrefs(o), [r2, r1]) 736 self.assertEqual(weakref.getweakrefcount(o), 2) 737 r3 = MyRef(o) 738 self.assertEqual(weakref.getweakrefcount(o), 3) 739 refs = weakref.getweakrefs(o) 740 self.assertEqual(len(refs), 3) 741 self.assertTrue(r2 is refs[0]) 742 self.assertIn(r1, refs[1:]) 743 self.assertIn(r3, refs[1:]) 744 745 def test_subclass_refs_dont_conflate_callbacks(self): 746 class MyRef(weakref.ref): 747 pass 748 o = Object(42) 749 r1 = MyRef(o, id) 750 r2 = MyRef(o, str) 751 self.assertTrue(r1 is not r2) 752 refs = weakref.getweakrefs(o) 753 self.assertIn(r1, refs) 754 self.assertIn(r2, refs) 755 756 def test_subclass_refs_with_slots(self): 757 class MyRef(weakref.ref): 758 __slots__ = "slot1", "slot2" 759 def __new__(type, ob, callback, slot1, slot2): 760 return weakref.ref.__new__(type, ob, callback) 761 def __init__(self, ob, callback, slot1, slot2): 762 self.slot1 = slot1 763 self.slot2 = slot2 764 def meth(self): 765 return self.slot1 + self.slot2 766 o = Object(42) 767 r = MyRef(o, None, "abc", "def") 768 self.assertEqual(r.slot1, "abc") 769 self.assertEqual(r.slot2, "def") 770 self.assertEqual(r.meth(), "abcdef") 771 self.assertFalse(hasattr(r, "__dict__")) 772 773 def test_subclass_refs_with_cycle(self): 774 # Bug #3110 775 # An instance of a weakref subclass can have attributes. 776 # If such a weakref holds the only strong reference to the object, 777 # deleting the weakref will delete the object. In this case, 778 # the callback must not be called, because the ref object is 779 # being deleted. 780 class MyRef(weakref.ref): 781 pass 782 783 # Use a local callback, for "regrtest -R::" 784 # to detect refcounting problems 785 def callback(w): 786 self.cbcalled += 1 787 788 o = C() 789 r1 = MyRef(o, callback) 790 r1.o = o 791 del o 792 793 del r1 # Used to crash here 794 795 self.assertEqual(self.cbcalled, 0) 796 797 # Same test, with two weakrefs to the same object 798 # (since code paths are different) 799 o = C() 800 r1 = MyRef(o, callback) 801 r2 = MyRef(o, callback) 802 r1.r = r2 803 r2.o = o 804 del o 805 del r2 806 807 del r1 # Used to crash here 808 809 self.assertEqual(self.cbcalled, 0) 810 811 812 class Object: 813 def __init__(self, arg): 814 self.arg = arg 815 def __repr__(self): 816 return "<Object %r>" % self.arg 817 818 819 class MappingTestCase(TestBase): 820 821 COUNT = 10 822 823 def test_weak_values(self): 824 # 825 # This exercises d.copy(), d.items(), d[], del d[], len(d). 826 # 827 dict, objects = self.make_weak_valued_dict() 828 for o in objects: 829 self.assertTrue(weakref.getweakrefcount(o) == 1, 830 "wrong number of weak references to %r!" % o) 831 self.assertTrue(o is dict[o.arg], 832 "wrong object returned by weak dict!") 833 items1 = dict.items() 834 items2 = dict.copy().items() 835 items1.sort() 836 items2.sort() 837 self.assertTrue(items1 == items2, 838 "cloning of weak-valued dictionary did not work!") 839 del items1, items2 840 self.assertTrue(len(dict) == self.COUNT) 841 del objects[0] 842 self.assertTrue(len(dict) == (self.COUNT - 1), 843 "deleting object did not cause dictionary update") 844 del objects, o 845 self.assertTrue(len(dict) == 0, 846 "deleting the values did not clear the dictionary") 847 # regression on SF bug #447152: 848 dict = weakref.WeakValueDictionary() 849 self.assertRaises(KeyError, dict.__getitem__, 1) 850 dict[2] = C() 851 self.assertRaises(KeyError, dict.__getitem__, 2) 852 853 def test_weak_keys(self): 854 # 855 # This exercises d.copy(), d.items(), d[] = v, d[], del d[], 856 # len(d), in d. 857 # 858 dict, objects = self.make_weak_keyed_dict() 859 for o in objects: 860 self.assertTrue(weakref.getweakrefcount(o) == 1, 861 "wrong number of weak references to %r!" % o) 862 self.assertTrue(o.arg is dict[o], 863 "wrong object returned by weak dict!") 864 items1 = dict.items() 865 items2 = dict.copy().items() 866 self.assertTrue(set(items1) == set(items2), 867 "cloning of weak-keyed dictionary did not work!") 868 del items1, items2 869 self.assertTrue(len(dict) == self.COUNT) 870 del objects[0] 871 self.assertTrue(len(dict) == (self.COUNT - 1), 872 "deleting object did not cause dictionary update") 873 del objects, o 874 self.assertTrue(len(dict) == 0, 875 "deleting the keys did not clear the dictionary") 876 o = Object(42) 877 dict[o] = "What is the meaning of the universe?" 878 self.assertIn(o, dict) 879 self.assertNotIn(34, dict) 880 881 def test_weak_keyed_iters(self): 882 dict, objects = self.make_weak_keyed_dict() 883 self.check_iters(dict) 884 885 # Test keyrefs() 886 refs = dict.keyrefs() 887 self.assertEqual(len(refs), len(objects)) 888 objects2 = list(objects) 889 for wr in refs: 890 ob = wr() 891 self.assertIn(ob, dict) 892 self.assertEqual(ob.arg, dict[ob]) 893 objects2.remove(ob) 894 self.assertEqual(len(objects2), 0) 895 896 # Test iterkeyrefs() 897 objects2 = list(objects) 898 self.assertEqual(len(list(dict.iterkeyrefs())), len(objects)) 899 for wr in dict.iterkeyrefs(): 900 ob = wr() 901 self.assertIn(ob, dict) 902 self.assertEqual(ob.arg, dict[ob]) 903 objects2.remove(ob) 904 self.assertEqual(len(objects2), 0) 905 906 def test_weak_valued_iters(self): 907 dict, objects = self.make_weak_valued_dict() 908 self.check_iters(dict) 909 910 # Test valuerefs() 911 refs = dict.valuerefs() 912 self.assertEqual(len(refs), len(objects)) 913 objects2 = list(objects) 914 for wr in refs: 915 ob = wr() 916 self.assertEqual(ob, dict[ob.arg]) 917 self.assertEqual(ob.arg, dict[ob.arg].arg) 918 objects2.remove(ob) 919 self.assertEqual(len(objects2), 0) 920 921 # Test itervaluerefs() 922 objects2 = list(objects) 923 self.assertEqual(len(list(dict.itervaluerefs())), len(objects)) 924 for wr in dict.itervaluerefs(): 925 ob = wr() 926 self.assertEqual(ob, dict[ob.arg]) 927 self.assertEqual(ob.arg, dict[ob.arg].arg) 928 objects2.remove(ob) 929 self.assertEqual(len(objects2), 0) 930 931 def check_iters(self, dict): 932 # item iterator: 933 items = dict.items() 934 for item in dict.iteritems(): 935 items.remove(item) 936 self.assertTrue(len(items) == 0, "iteritems() did not touch all items") 937 938 # key iterator, via __iter__(): 939 keys = dict.keys() 940 for k in dict: 941 keys.remove(k) 942 self.assertTrue(len(keys) == 0, "__iter__() did not touch all keys") 943 944 # key iterator, via iterkeys(): 945 keys = dict.keys() 946 for k in dict.iterkeys(): 947 keys.remove(k) 948 self.assertTrue(len(keys) == 0, "iterkeys() did not touch all keys") 949 950 # value iterator: 951 values = dict.values() 952 for v in dict.itervalues(): 953 values.remove(v) 954 self.assertTrue(len(values) == 0, 955 "itervalues() did not touch all values") 956 957 def test_make_weak_keyed_dict_from_dict(self): 958 o = Object(3) 959 dict = weakref.WeakKeyDictionary({o:364}) 960 self.assertTrue(dict[o] == 364) 961 962 def test_make_weak_keyed_dict_from_weak_keyed_dict(self): 963 o = Object(3) 964 dict = weakref.WeakKeyDictionary({o:364}) 965 dict2 = weakref.WeakKeyDictionary(dict) 966 self.assertTrue(dict[o] == 364) 967 968 def make_weak_keyed_dict(self): 969 dict = weakref.WeakKeyDictionary() 970 objects = map(Object, range(self.COUNT)) 971 for o in objects: 972 dict[o] = o.arg 973 return dict, objects 974 975 def make_weak_valued_dict(self): 976 dict = weakref.WeakValueDictionary() 977 objects = map(Object, range(self.COUNT)) 978 for o in objects: 979 dict[o.arg] = o 980 return dict, objects 981 982 def check_popitem(self, klass, key1, value1, key2, value2): 983 weakdict = klass() 984 weakdict[key1] = value1 985 weakdict[key2] = value2 986 self.assertTrue(len(weakdict) == 2) 987 k, v = weakdict.popitem() 988 self.assertTrue(len(weakdict) == 1) 989 if k is key1: 990 self.assertTrue(v is value1) 991 else: 992 self.assertTrue(v is value2) 993 k, v = weakdict.popitem() 994 self.assertTrue(len(weakdict) == 0) 995 if k is key1: 996 self.assertTrue(v is value1) 997 else: 998 self.assertTrue(v is value2) 999 1000 def test_weak_valued_dict_popitem(self): 1001 self.check_popitem(weakref.WeakValueDictionary, 1002 "key1", C(), "key2", C()) 1003 1004 def test_weak_keyed_dict_popitem(self): 1005 self.check_popitem(weakref.WeakKeyDictionary, 1006 C(), "value 1", C(), "value 2") 1007 1008 def check_setdefault(self, klass, key, value1, value2): 1009 self.assertTrue(value1 is not value2, 1010 "invalid test" 1011 " -- value parameters must be distinct objects") 1012 weakdict = klass() 1013 o = weakdict.setdefault(key, value1) 1014 self.assertIs(o, value1) 1015 self.assertIn(key, weakdict) 1016 self.assertIs(weakdict.get(key), value1) 1017 self.assertIs(weakdict[key], value1) 1018 1019 o = weakdict.setdefault(key, value2) 1020 self.assertIs(o, value1) 1021 self.assertIn(key, weakdict) 1022 self.assertIs(weakdict.get(key), value1) 1023 self.assertIs(weakdict[key], value1) 1024 1025 def test_weak_valued_dict_setdefault(self): 1026 self.check_setdefault(weakref.WeakValueDictionary, 1027 "key", C(), C()) 1028 1029 def test_weak_keyed_dict_setdefault(self): 1030 self.check_setdefault(weakref.WeakKeyDictionary, 1031 C(), "value 1", "value 2") 1032 1033 def check_update(self, klass, dict): 1034 # 1035 # This exercises d.update(), len(d), d.keys(), in d, 1036 # d.get(), d[]. 1037 # 1038 weakdict = klass() 1039 weakdict.update(dict) 1040 self.assertEqual(len(weakdict), len(dict)) 1041 for k in weakdict.keys(): 1042 self.assertIn(k, dict, 1043 "mysterious new key appeared in weak dict") 1044 v = dict.get(k) 1045 self.assertIs(v, weakdict[k]) 1046 self.assertIs(v, weakdict.get(k)) 1047 for k in dict.keys(): 1048 self.assertIn(k, weakdict, 1049 "original key disappeared in weak dict") 1050 v = dict[k] 1051 self.assertIs(v, weakdict[k]) 1052 self.assertIs(v, weakdict.get(k)) 1053 1054 def test_weak_valued_dict_update(self): 1055 self.check_update(weakref.WeakValueDictionary, 1056 {1: C(), 'a': C(), C(): C()}) 1057 1058 def test_weak_keyed_dict_update(self): 1059 self.check_update(weakref.WeakKeyDictionary, 1060 {C(): 1, C(): 2, C(): 3}) 1061 1062 def test_weak_keyed_delitem(self): 1063 d = weakref.WeakKeyDictionary() 1064 o1 = Object('1') 1065 o2 = Object('2') 1066 d[o1] = 'something' 1067 d[o2] = 'something' 1068 self.assertTrue(len(d) == 2) 1069 del d[o1] 1070 self.assertTrue(len(d) == 1) 1071 self.assertTrue(d.keys() == [o2]) 1072 1073 def test_weak_valued_delitem(self): 1074 d = weakref.WeakValueDictionary() 1075 o1 = Object('1') 1076 o2 = Object('2') 1077 d['something'] = o1 1078 d['something else'] = o2 1079 self.assertTrue(len(d) == 2) 1080 del d['something'] 1081 self.assertTrue(len(d) == 1) 1082 self.assertTrue(d.items() == [('something else', o2)]) 1083 1084 def test_weak_keyed_bad_delitem(self): 1085 d = weakref.WeakKeyDictionary() 1086 o = Object('1') 1087 # An attempt to delete an object that isn't there should raise 1088 # KeyError. It didn't before 2.3. 1089 self.assertRaises(KeyError, d.__delitem__, o) 1090 self.assertRaises(KeyError, d.__getitem__, o) 1091 1092 # If a key isn't of a weakly referencable type, __getitem__ and 1093 # __setitem__ raise TypeError. __delitem__ should too. 1094 self.assertRaises(TypeError, d.__delitem__, 13) 1095 self.assertRaises(TypeError, d.__getitem__, 13) 1096 self.assertRaises(TypeError, d.__setitem__, 13, 13) 1097 1098 def test_weak_keyed_cascading_deletes(self): 1099 # SF bug 742860. For some reason, before 2.3 __delitem__ iterated 1100 # over the keys via self.data.iterkeys(). If things vanished from 1101 # the dict during this (or got added), that caused a RuntimeError. 1102 1103 d = weakref.WeakKeyDictionary() 1104 mutate = False 1105 1106 class C(object): 1107 def __init__(self, i): 1108 self.value = i 1109 def __hash__(self): 1110 return hash(self.value) 1111 def __eq__(self, other): 1112 if mutate: 1113 # Side effect that mutates the dict, by removing the 1114 # last strong reference to a key. 1115 del objs[-1] 1116 return self.value == other.value 1117 1118 objs = [C(i) for i in range(4)] 1119 for o in objs: 1120 d[o] = o.value 1121 del o # now the only strong references to keys are in objs 1122 # Find the order in which iterkeys sees the keys. 1123 objs = d.keys() 1124 # Reverse it, so that the iteration implementation of __delitem__ 1125 # has to keep looping to find the first object we delete. 1126 objs.reverse() 1127 1128 # Turn on mutation in C.__eq__. The first time thru the loop, 1129 # under the iterkeys() business the first comparison will delete 1130 # the last item iterkeys() would see, and that causes a 1131 # RuntimeError: dictionary changed size during iteration 1132 # when the iterkeys() loop goes around to try comparing the next 1133 # key. After this was fixed, it just deletes the last object *our* 1134 # "for o in obj" loop would have gotten to. 1135 mutate = True 1136 count = 0 1137 for o in objs: 1138 count += 1 1139 del d[o] 1140 self.assertEqual(len(d), 0) 1141 self.assertEqual(count, 2) 1142 1143 from test import mapping_tests 1144 1145 class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1146 """Check that WeakValueDictionary conforms to the mapping protocol""" 1147 __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)} 1148 type2test = weakref.WeakValueDictionary 1149 def _reference(self): 1150 return self.__ref.copy() 1151 1152 class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1153 """Check that WeakKeyDictionary conforms to the mapping protocol""" 1154 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} 1155 type2test = weakref.WeakKeyDictionary 1156 def _reference(self): 1157 return self.__ref.copy() 1158 1159 libreftest = """ Doctest for examples in the library reference: weakref.rst 1160 1161 >>> import weakref 1162 >>> class Dict(dict): 1163 ... pass 1164 ... 1165 >>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable 1166 >>> r = weakref.ref(obj) 1167 >>> print r() is obj 1168 True 1169 1170 >>> import weakref 1171 >>> class Object: 1172 ... pass 1173 ... 1174 >>> o = Object() 1175 >>> r = weakref.ref(o) 1176 >>> o2 = r() 1177 >>> o is o2 1178 True 1179 >>> del o, o2 1180 >>> print r() 1181 None 1182 1183 >>> import weakref 1184 >>> class ExtendedRef(weakref.ref): 1185 ... def __init__(self, ob, callback=None, **annotations): 1186 ... super(ExtendedRef, self).__init__(ob, callback) 1187 ... self.__counter = 0 1188 ... for k, v in annotations.iteritems(): 1189 ... setattr(self, k, v) 1190 ... def __call__(self): 1191 ... '''Return a pair containing the referent and the number of 1192 ... times the reference has been called. 1193 ... ''' 1194 ... ob = super(ExtendedRef, self).__call__() 1195 ... if ob is not None: 1196 ... self.__counter += 1 1197 ... ob = (ob, self.__counter) 1198 ... return ob 1199 ... 1200 >>> class A: # not in docs from here, just testing the ExtendedRef 1201 ... pass 1202 ... 1203 >>> a = A() 1204 >>> r = ExtendedRef(a, foo=1, bar="baz") 1205 >>> r.foo 1206 1 1207 >>> r.bar 1208 'baz' 1209 >>> r()[1] 1210 1 1211 >>> r()[1] 1212 2 1213 >>> r()[0] is a 1214 True 1215 1216 1217 >>> import weakref 1218 >>> _id2obj_dict = weakref.WeakValueDictionary() 1219 >>> def remember(obj): 1220 ... oid = id(obj) 1221 ... _id2obj_dict[oid] = obj 1222 ... return oid 1223 ... 1224 >>> def id2obj(oid): 1225 ... return _id2obj_dict[oid] 1226 ... 1227 >>> a = A() # from here, just testing 1228 >>> a_id = remember(a) 1229 >>> id2obj(a_id) is a 1230 True 1231 >>> del a 1232 >>> try: 1233 ... id2obj(a_id) 1234 ... except KeyError: 1235 ... print 'OK' 1236 ... else: 1237 ... print 'WeakValueDictionary error' 1238 OK 1239 1240 """ 1241 1242 __test__ = {'libreftest' : libreftest} 1243 1244 def test_main(): 1245 test_support.run_unittest( 1246 ReferencesTestCase, 1247 MappingTestCase, 1248 WeakValueDictionaryTestCase, 1249 WeakKeyDictionaryTestCase, 1250 SubclassableWeakrefTestCase, 1251 ) 1252 test_support.run_doctest(sys.modules[__name__]) 1253 1254 1255 if __name__ == "__main__": 1256 test_main() 1257