1 import gc 2 import sys 3 import unittest 4 import UserList 5 import weakref 6 import operator 7 8 from test import test_support 9 10 # Used in ReferencesTestCase.test_ref_created_during_del() . 11 ref_from_del = None 12 13 class C: 14 def method(self): 15 pass 16 17 18 class Callable: 19 bar = None 20 21 def __call__(self, x): 22 self.bar = x 23 24 25 def create_function(): 26 def f(): pass 27 return f 28 29 def create_bound_method(): 30 return C().method 31 32 def create_unbound_method(): 33 return C.method 34 35 36 class Object: 37 def __init__(self, arg): 38 self.arg = arg 39 def __repr__(self): 40 return "<Object %r>" % self.arg 41 def __eq__(self, other): 42 if isinstance(other, Object): 43 return self.arg == other.arg 44 return NotImplemented 45 def __ne__(self, other): 46 if isinstance(other, Object): 47 return self.arg != other.arg 48 return NotImplemented 49 def __hash__(self): 50 return hash(self.arg) 51 52 class RefCycle: 53 def __init__(self): 54 self.cycle = self 55 56 57 class TestBase(unittest.TestCase): 58 59 def setUp(self): 60 self.cbcalled = 0 61 62 def callback(self, ref): 63 self.cbcalled += 1 64 65 66 class ReferencesTestCase(TestBase): 67 68 def test_basic_ref(self): 69 self.check_basic_ref(C) 70 self.check_basic_ref(create_function) 71 self.check_basic_ref(create_bound_method) 72 self.check_basic_ref(create_unbound_method) 73 74 # Just make sure the tp_repr handler doesn't raise an exception. 75 # Live reference: 76 o = C() 77 wr = weakref.ref(o) 78 repr(wr) 79 # Dead reference: 80 del o 81 repr(wr) 82 83 def test_basic_callback(self): 84 self.check_basic_callback(C) 85 self.check_basic_callback(create_function) 86 self.check_basic_callback(create_bound_method) 87 self.check_basic_callback(create_unbound_method) 88 89 def test_multiple_callbacks(self): 90 o = C() 91 ref1 = weakref.ref(o, self.callback) 92 ref2 = weakref.ref(o, self.callback) 93 del o 94 self.assertTrue(ref1() is None, 95 "expected reference to be invalidated") 96 self.assertTrue(ref2() is None, 97 "expected reference to be invalidated") 98 self.assertTrue(self.cbcalled == 2, 99 "callback not called the right number of times") 100 101 def test_multiple_selfref_callbacks(self): 102 # Make sure all references are invalidated before callbacks are called 103 # 104 # What's important here is that we're using the first 105 # reference in the callback invoked on the second reference 106 # (the most recently created ref is cleaned up first). This 107 # tests that all references to the object are invalidated 108 # before any of the callbacks are invoked, so that we only 109 # have one invocation of _weakref.c:cleanup_helper() active 110 # for a particular object at a time. 111 # 112 def callback(object, self=self): 113 self.ref() 114 c = C() 115 self.ref = weakref.ref(c, callback) 116 ref1 = weakref.ref(c, callback) 117 del c 118 119 def test_proxy_ref(self): 120 o = C() 121 o.bar = 1 122 ref1 = weakref.proxy(o, self.callback) 123 ref2 = weakref.proxy(o, self.callback) 124 del o 125 126 def check(proxy): 127 proxy.bar 128 129 self.assertRaises(weakref.ReferenceError, check, ref1) 130 self.assertRaises(weakref.ReferenceError, check, ref2) 131 self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C())) 132 self.assertTrue(self.cbcalled == 2) 133 134 def check_basic_ref(self, factory): 135 o = factory() 136 ref = weakref.ref(o) 137 self.assertTrue(ref() is not None, 138 "weak reference to live object should be live") 139 o2 = ref() 140 self.assertTrue(o is o2, 141 "<ref>() should return original object if live") 142 143 def check_basic_callback(self, factory): 144 self.cbcalled = 0 145 o = factory() 146 ref = weakref.ref(o, self.callback) 147 del o 148 self.assertTrue(self.cbcalled == 1, 149 "callback did not properly set 'cbcalled'") 150 self.assertTrue(ref() is None, 151 "ref2 should be dead after deleting object reference") 152 153 def test_ref_reuse(self): 154 o = C() 155 ref1 = weakref.ref(o) 156 # create a proxy to make sure that there's an intervening creation 157 # between these two; it should make no difference 158 proxy = weakref.proxy(o) 159 ref2 = weakref.ref(o) 160 self.assertTrue(ref1 is ref2, 161 "reference object w/out callback should be re-used") 162 163 o = C() 164 proxy = weakref.proxy(o) 165 ref1 = weakref.ref(o) 166 ref2 = weakref.ref(o) 167 self.assertTrue(ref1 is ref2, 168 "reference object w/out callback should be re-used") 169 self.assertTrue(weakref.getweakrefcount(o) == 2, 170 "wrong weak ref count for object") 171 del proxy 172 self.assertTrue(weakref.getweakrefcount(o) == 1, 173 "wrong weak ref count for object after deleting proxy") 174 175 def test_proxy_reuse(self): 176 o = C() 177 proxy1 = weakref.proxy(o) 178 ref = weakref.ref(o) 179 proxy2 = weakref.proxy(o) 180 self.assertTrue(proxy1 is proxy2, 181 "proxy object w/out callback should have been re-used") 182 183 def test_basic_proxy(self): 184 o = C() 185 self.check_proxy(o, weakref.proxy(o)) 186 187 L = UserList.UserList() 188 p = weakref.proxy(L) 189 self.assertFalse(p, "proxy for empty UserList should be false") 190 p.append(12) 191 self.assertEqual(len(L), 1) 192 self.assertTrue(p, "proxy for non-empty UserList should be true") 193 with test_support.check_py3k_warnings(): 194 p[:] = [2, 3] 195 self.assertEqual(len(L), 2) 196 self.assertEqual(len(p), 2) 197 self.assertIn(3, p, "proxy didn't support __contains__() properly") 198 p[1] = 5 199 self.assertEqual(L[1], 5) 200 self.assertEqual(p[1], 5) 201 L2 = UserList.UserList(L) 202 p2 = weakref.proxy(L2) 203 self.assertEqual(p, p2) 204 ## self.assertEqual(repr(L2), repr(p2)) 205 L3 = UserList.UserList(range(10)) 206 p3 = weakref.proxy(L3) 207 with test_support.check_py3k_warnings(): 208 self.assertEqual(L3[:], p3[:]) 209 self.assertEqual(L3[5:], p3[5:]) 210 self.assertEqual(L3[:5], p3[:5]) 211 self.assertEqual(L3[2:5], p3[2:5]) 212 213 def test_proxy_unicode(self): 214 # See bug 5037 215 class C(object): 216 def __str__(self): 217 return "string" 218 def __unicode__(self): 219 return u"unicode" 220 instance = C() 221 self.assertIn("__unicode__", dir(weakref.proxy(instance))) 222 self.assertEqual(unicode(weakref.proxy(instance)), u"unicode") 223 224 def test_proxy_index(self): 225 class C: 226 def __index__(self): 227 return 10 228 o = C() 229 p = weakref.proxy(o) 230 self.assertEqual(operator.index(p), 10) 231 232 def test_proxy_div(self): 233 class C: 234 def __floordiv__(self, other): 235 return 42 236 def __ifloordiv__(self, other): 237 return 21 238 o = C() 239 p = weakref.proxy(o) 240 self.assertEqual(p // 5, 42) 241 p //= 5 242 self.assertEqual(p, 21) 243 244 # The PyWeakref_* C API is documented as allowing either NULL or 245 # None as the value for the callback, where either means "no 246 # callback". The "no callback" ref and proxy objects are supposed 247 # to be shared so long as they exist by all callers so long as 248 # they are active. In Python 2.3.3 and earlier, this guarantee 249 # was not honored, and was broken in different ways for 250 # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.) 251 252 def test_shared_ref_without_callback(self): 253 self.check_shared_without_callback(weakref.ref) 254 255 def test_shared_proxy_without_callback(self): 256 self.check_shared_without_callback(weakref.proxy) 257 258 def check_shared_without_callback(self, makeref): 259 o = Object(1) 260 p1 = makeref(o, None) 261 p2 = makeref(o, None) 262 self.assertTrue(p1 is p2, "both callbacks were None in the C API") 263 del p1, p2 264 p1 = makeref(o) 265 p2 = makeref(o, None) 266 self.assertTrue(p1 is p2, "callbacks were NULL, None in the C API") 267 del p1, p2 268 p1 = makeref(o) 269 p2 = makeref(o) 270 self.assertTrue(p1 is p2, "both callbacks were NULL in the C API") 271 del p1, p2 272 p1 = makeref(o, None) 273 p2 = makeref(o) 274 self.assertTrue(p1 is p2, "callbacks were None, NULL in the C API") 275 276 def test_callable_proxy(self): 277 o = Callable() 278 ref1 = weakref.proxy(o) 279 280 self.check_proxy(o, ref1) 281 282 self.assertTrue(type(ref1) is weakref.CallableProxyType, 283 "proxy is not of callable type") 284 ref1('twinkies!') 285 self.assertTrue(o.bar == 'twinkies!', 286 "call through proxy not passed through to original") 287 ref1(x='Splat.') 288 self.assertTrue(o.bar == 'Splat.', 289 "call through proxy not passed through to original") 290 291 # expect due to too few args 292 self.assertRaises(TypeError, ref1) 293 294 # expect due to too many args 295 self.assertRaises(TypeError, ref1, 1, 2, 3) 296 297 def check_proxy(self, o, proxy): 298 o.foo = 1 299 self.assertTrue(proxy.foo == 1, 300 "proxy does not reflect attribute addition") 301 o.foo = 2 302 self.assertTrue(proxy.foo == 2, 303 "proxy does not reflect attribute modification") 304 del o.foo 305 self.assertTrue(not hasattr(proxy, 'foo'), 306 "proxy does not reflect attribute removal") 307 308 proxy.foo = 1 309 self.assertTrue(o.foo == 1, 310 "object does not reflect attribute addition via proxy") 311 proxy.foo = 2 312 self.assertTrue( 313 o.foo == 2, 314 "object does not reflect attribute modification via proxy") 315 del proxy.foo 316 self.assertTrue(not hasattr(o, 'foo'), 317 "object does not reflect attribute removal via proxy") 318 319 def test_proxy_deletion(self): 320 # Test clearing of SF bug #762891 321 class Foo: 322 result = None 323 def __delitem__(self, accessor): 324 self.result = accessor 325 g = Foo() 326 f = weakref.proxy(g) 327 del f[0] 328 self.assertEqual(f.result, 0) 329 330 def test_proxy_bool(self): 331 # Test clearing of SF bug #1170766 332 class List(list): pass 333 lyst = List() 334 self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst)) 335 336 def test_getweakrefcount(self): 337 o = C() 338 ref1 = weakref.ref(o) 339 ref2 = weakref.ref(o, self.callback) 340 self.assertTrue(weakref.getweakrefcount(o) == 2, 341 "got wrong number of weak reference objects") 342 343 proxy1 = weakref.proxy(o) 344 proxy2 = weakref.proxy(o, self.callback) 345 self.assertTrue(weakref.getweakrefcount(o) == 4, 346 "got wrong number of weak reference objects") 347 348 del ref1, ref2, proxy1, proxy2 349 self.assertTrue(weakref.getweakrefcount(o) == 0, 350 "weak reference objects not unlinked from" 351 " referent when discarded.") 352 353 # assumes ints do not support weakrefs 354 self.assertTrue(weakref.getweakrefcount(1) == 0, 355 "got wrong number of weak reference objects for int") 356 357 def test_getweakrefs(self): 358 o = C() 359 ref1 = weakref.ref(o, self.callback) 360 ref2 = weakref.ref(o, self.callback) 361 del ref1 362 self.assertTrue(weakref.getweakrefs(o) == [ref2], 363 "list of refs does not match") 364 365 o = C() 366 ref1 = weakref.ref(o, self.callback) 367 ref2 = weakref.ref(o, self.callback) 368 del ref2 369 self.assertTrue(weakref.getweakrefs(o) == [ref1], 370 "list of refs does not match") 371 372 del ref1 373 self.assertTrue(weakref.getweakrefs(o) == [], 374 "list of refs not cleared") 375 376 # assumes ints do not support weakrefs 377 self.assertTrue(weakref.getweakrefs(1) == [], 378 "list of refs does not match for int") 379 380 def test_newstyle_number_ops(self): 381 class F(float): 382 pass 383 f = F(2.0) 384 p = weakref.proxy(f) 385 self.assertTrue(p + 1.0 == 3.0) 386 self.assertTrue(1.0 + p == 3.0) # this used to SEGV 387 388 def test_callbacks_protected(self): 389 # Callbacks protected from already-set exceptions? 390 # Regression test for SF bug #478534. 391 class BogusError(Exception): 392 pass 393 data = {} 394 def remove(k): 395 del data[k] 396 def encapsulate(): 397 f = lambda : () 398 data[weakref.ref(f, remove)] = None 399 raise BogusError 400 try: 401 encapsulate() 402 except BogusError: 403 pass 404 else: 405 self.fail("exception not properly restored") 406 try: 407 encapsulate() 408 except BogusError: 409 pass 410 else: 411 self.fail("exception not properly restored") 412 413 def test_sf_bug_840829(self): 414 # "weakref callbacks and gc corrupt memory" 415 # subtype_dealloc erroneously exposed a new-style instance 416 # already in the process of getting deallocated to gc, 417 # causing double-deallocation if the instance had a weakref 418 # callback that triggered gc. 419 # If the bug exists, there probably won't be an obvious symptom 420 # in a release build. In a debug build, a segfault will occur 421 # when the second attempt to remove the instance from the "list 422 # of all objects" occurs. 423 424 import gc 425 426 class C(object): 427 pass 428 429 c = C() 430 wr = weakref.ref(c, lambda ignore: gc.collect()) 431 del c 432 433 # There endeth the first part. It gets worse. 434 del wr 435 436 c1 = C() 437 c1.i = C() 438 wr = weakref.ref(c1.i, lambda ignore: gc.collect()) 439 440 c2 = C() 441 c2.c1 = c1 442 del c1 # still alive because c2 points to it 443 444 # Now when subtype_dealloc gets called on c2, it's not enough just 445 # that c2 is immune from gc while the weakref callbacks associated 446 # with c2 execute (there are none in this 2nd half of the test, btw). 447 # subtype_dealloc goes on to call the base classes' deallocs too, 448 # so any gc triggered by weakref callbacks associated with anything 449 # torn down by a base class dealloc can also trigger double 450 # deallocation of c2. 451 del c2 452 453 def test_callback_in_cycle_1(self): 454 import gc 455 456 class J(object): 457 pass 458 459 class II(object): 460 def acallback(self, ignore): 461 self.J 462 463 I = II() 464 I.J = J 465 I.wr = weakref.ref(J, I.acallback) 466 467 # Now J and II are each in a self-cycle (as all new-style class 468 # objects are, since their __mro__ points back to them). I holds 469 # both a weak reference (I.wr) and a strong reference (I.J) to class 470 # J. I is also in a cycle (I.wr points to a weakref that references 471 # I.acallback). When we del these three, they all become trash, but 472 # the cycles prevent any of them from getting cleaned up immediately. 473 # Instead they have to wait for cyclic gc to deduce that they're 474 # trash. 475 # 476 # gc used to call tp_clear on all of them, and the order in which 477 # it does that is pretty accidental. The exact order in which we 478 # built up these things manages to provoke gc into running tp_clear 479 # in just the right order (I last). Calling tp_clear on II leaves 480 # behind an insane class object (its __mro__ becomes NULL). Calling 481 # tp_clear on J breaks its self-cycle, but J doesn't get deleted 482 # just then because of the strong reference from I.J. Calling 483 # tp_clear on I starts to clear I's __dict__, and just happens to 484 # clear I.J first -- I.wr is still intact. That removes the last 485 # reference to J, which triggers the weakref callback. The callback 486 # tries to do "self.J", and instances of new-style classes look up 487 # attributes ("J") in the class dict first. The class (II) wants to 488 # search II.__mro__, but that's NULL. The result was a segfault in 489 # a release build, and an assert failure in a debug build. 490 del I, J, II 491 gc.collect() 492 493 def test_callback_in_cycle_2(self): 494 import gc 495 496 # This is just like test_callback_in_cycle_1, except that II is an 497 # old-style class. The symptom is different then: an instance of an 498 # old-style class looks in its own __dict__ first. 'J' happens to 499 # get cleared from I.__dict__ before 'wr', and 'J' was never in II's 500 # __dict__, so the attribute isn't found. The difference is that 501 # the old-style II doesn't have a NULL __mro__ (it doesn't have any 502 # __mro__), so no segfault occurs. Instead it got: 503 # test_callback_in_cycle_2 (__main__.ReferencesTestCase) ... 504 # Exception exceptions.AttributeError: 505 # "II instance has no attribute 'J'" in <bound method II.acallback 506 # of <?.II instance at 0x00B9B4B8>> ignored 507 508 class J(object): 509 pass 510 511 class II: 512 def acallback(self, ignore): 513 self.J 514 515 I = II() 516 I.J = J 517 I.wr = weakref.ref(J, I.acallback) 518 519 del I, J, II 520 gc.collect() 521 522 def test_callback_in_cycle_3(self): 523 import gc 524 525 # This one broke the first patch that fixed the last two. In this 526 # case, the objects reachable from the callback aren't also reachable 527 # from the object (c1) *triggering* the callback: you can get to 528 # c1 from c2, but not vice-versa. The result was that c2's __dict__ 529 # got tp_clear'ed by the time the c2.cb callback got invoked. 530 531 class C: 532 def cb(self, ignore): 533 self.me 534 self.c1 535 self.wr 536 537 c1, c2 = C(), C() 538 539 c2.me = c2 540 c2.c1 = c1 541 c2.wr = weakref.ref(c1, c2.cb) 542 543 del c1, c2 544 gc.collect() 545 546 def test_callback_in_cycle_4(self): 547 import gc 548 549 # Like test_callback_in_cycle_3, except c2 and c1 have different 550 # classes. c2's class (C) isn't reachable from c1 then, so protecting 551 # objects reachable from the dying object (c1) isn't enough to stop 552 # c2's class (C) from getting tp_clear'ed before c2.cb is invoked. 553 # The result was a segfault (C.__mro__ was NULL when the callback 554 # tried to look up self.me). 555 556 class C(object): 557 def cb(self, ignore): 558 self.me 559 self.c1 560 self.wr 561 562 class D: 563 pass 564 565 c1, c2 = D(), C() 566 567 c2.me = c2 568 c2.c1 = c1 569 c2.wr = weakref.ref(c1, c2.cb) 570 571 del c1, c2, C, D 572 gc.collect() 573 574 def test_callback_in_cycle_resurrection(self): 575 import gc 576 577 # Do something nasty in a weakref callback: resurrect objects 578 # from dead cycles. For this to be attempted, the weakref and 579 # its callback must also be part of the cyclic trash (else the 580 # objects reachable via the callback couldn't be in cyclic trash 581 # to begin with -- the callback would act like an external root). 582 # But gc clears trash weakrefs with callbacks early now, which 583 # disables the callbacks, so the callbacks shouldn't get called 584 # at all (and so nothing actually gets resurrected). 585 586 alist = [] 587 class C(object): 588 def __init__(self, value): 589 self.attribute = value 590 591 def acallback(self, ignore): 592 alist.append(self.c) 593 594 c1, c2 = C(1), C(2) 595 c1.c = c2 596 c2.c = c1 597 c1.wr = weakref.ref(c2, c1.acallback) 598 c2.wr = weakref.ref(c1, c2.acallback) 599 600 def C_went_away(ignore): 601 alist.append("C went away") 602 wr = weakref.ref(C, C_went_away) 603 604 del c1, c2, C # make them all trash 605 self.assertEqual(alist, []) # del isn't enough to reclaim anything 606 607 gc.collect() 608 # c1.wr and c2.wr were part of the cyclic trash, so should have 609 # been cleared without their callbacks executing. OTOH, the weakref 610 # to C is bound to a function local (wr), and wasn't trash, so that 611 # callback should have been invoked when C went away. 612 self.assertEqual(alist, ["C went away"]) 613 # The remaining weakref should be dead now (its callback ran). 614 self.assertEqual(wr(), None) 615 616 del alist[:] 617 gc.collect() 618 self.assertEqual(alist, []) 619 620 def test_callbacks_on_callback(self): 621 import gc 622 623 # Set up weakref callbacks *on* weakref callbacks. 624 alist = [] 625 def safe_callback(ignore): 626 alist.append("safe_callback called") 627 628 class C(object): 629 def cb(self, ignore): 630 alist.append("cb called") 631 632 c, d = C(), C() 633 c.other = d 634 d.other = c 635 callback = c.cb 636 c.wr = weakref.ref(d, callback) # this won't trigger 637 d.wr = weakref.ref(callback, d.cb) # ditto 638 external_wr = weakref.ref(callback, safe_callback) # but this will 639 self.assertTrue(external_wr() is callback) 640 641 # The weakrefs attached to c and d should get cleared, so that 642 # C.cb is never called. But external_wr isn't part of the cyclic 643 # trash, and no cyclic trash is reachable from it, so safe_callback 644 # should get invoked when the bound method object callback (c.cb) 645 # -- which is itself a callback, and also part of the cyclic trash -- 646 # gets reclaimed at the end of gc. 647 648 del callback, c, d, C 649 self.assertEqual(alist, []) # del isn't enough to clean up cycles 650 gc.collect() 651 self.assertEqual(alist, ["safe_callback called"]) 652 self.assertEqual(external_wr(), None) 653 654 del alist[:] 655 gc.collect() 656 self.assertEqual(alist, []) 657 658 def test_gc_during_ref_creation(self): 659 self.check_gc_during_creation(weakref.ref) 660 661 def test_gc_during_proxy_creation(self): 662 self.check_gc_during_creation(weakref.proxy) 663 664 def check_gc_during_creation(self, makeref): 665 thresholds = gc.get_threshold() 666 gc.set_threshold(1, 1, 1) 667 gc.collect() 668 class A: 669 pass 670 671 def callback(*args): 672 pass 673 674 referenced = A() 675 676 a = A() 677 a.a = a 678 a.wr = makeref(referenced) 679 680 try: 681 # now make sure the object and the ref get labeled as 682 # cyclic trash: 683 a = A() 684 weakref.ref(referenced, callback) 685 686 finally: 687 gc.set_threshold(*thresholds) 688 689 def test_ref_created_during_del(self): 690 # Bug #1377858 691 # A weakref created in an object's __del__() would crash the 692 # interpreter when the weakref was cleaned up since it would refer to 693 # non-existent memory. This test should not segfault the interpreter. 694 class Target(object): 695 def __del__(self): 696 global ref_from_del 697 ref_from_del = weakref.ref(self) 698 699 w = Target() 700 701 def test_init(self): 702 # Issue 3634 703 # <weakref to class>.__init__() doesn't check errors correctly 704 r = weakref.ref(Exception) 705 self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0) 706 # No exception should be raised here 707 gc.collect() 708 709 def test_classes(self): 710 # Check that both old-style classes and new-style classes 711 # are weakrefable. 712 class A(object): 713 pass 714 class B: 715 pass 716 l = [] 717 weakref.ref(int) 718 a = weakref.ref(A, l.append) 719 A = None 720 gc.collect() 721 self.assertEqual(a(), None) 722 self.assertEqual(l, [a]) 723 b = weakref.ref(B, l.append) 724 B = None 725 gc.collect() 726 self.assertEqual(b(), None) 727 self.assertEqual(l, [a, b]) 728 729 def test_equality(self): 730 # Alive weakrefs defer equality testing to their underlying object. 731 x = Object(1) 732 y = Object(1) 733 z = Object(2) 734 a = weakref.ref(x) 735 b = weakref.ref(y) 736 c = weakref.ref(z) 737 d = weakref.ref(x) 738 # Note how we directly test the operators here, to stress both 739 # __eq__ and __ne__. 740 self.assertTrue(a == b) 741 self.assertFalse(a != b) 742 self.assertFalse(a == c) 743 self.assertTrue(a != c) 744 self.assertTrue(a == d) 745 self.assertFalse(a != d) 746 del x, y, z 747 gc.collect() 748 for r in a, b, c: 749 # Sanity check 750 self.assertIs(r(), None) 751 # Dead weakrefs compare by identity: whether `a` and `d` are the 752 # same weakref object is an implementation detail, since they pointed 753 # to the same original object and didn't have a callback. 754 # (see issue #16453). 755 self.assertFalse(a == b) 756 self.assertTrue(a != b) 757 self.assertFalse(a == c) 758 self.assertTrue(a != c) 759 self.assertEqual(a == d, a is d) 760 self.assertEqual(a != d, a is not d) 761 762 def test_hashing(self): 763 # Alive weakrefs hash the same as the underlying object 764 x = Object(42) 765 y = Object(42) 766 a = weakref.ref(x) 767 b = weakref.ref(y) 768 self.assertEqual(hash(a), hash(42)) 769 del x, y 770 gc.collect() 771 # Dead weakrefs: 772 # - retain their hash is they were hashed when alive; 773 # - otherwise, cannot be hashed. 774 self.assertEqual(hash(a), hash(42)) 775 self.assertRaises(TypeError, hash, b) 776 777 def test_trashcan_16602(self): 778 # Issue #16602: when a weakref's target was part of a long 779 # deallocation chain, the trashcan mechanism could delay clearing 780 # of the weakref and make the target object visible from outside 781 # code even though its refcount had dropped to 0. A crash ensued. 782 class C(object): 783 def __init__(self, parent): 784 if not parent: 785 return 786 wself = weakref.ref(self) 787 def cb(wparent): 788 o = wself() 789 self.wparent = weakref.ref(parent, cb) 790 791 d = weakref.WeakKeyDictionary() 792 root = c = C(None) 793 for n in range(100): 794 d[c] = c = C(c) 795 del root 796 gc.collect() 797 798 799 class SubclassableWeakrefTestCase(TestBase): 800 801 def test_subclass_refs(self): 802 class MyRef(weakref.ref): 803 def __init__(self, ob, callback=None, value=42): 804 self.value = value 805 super(MyRef, self).__init__(ob, callback) 806 def __call__(self): 807 self.called = True 808 return super(MyRef, self).__call__() 809 o = Object("foo") 810 mr = MyRef(o, value=24) 811 self.assertTrue(mr() is o) 812 self.assertTrue(mr.called) 813 self.assertEqual(mr.value, 24) 814 del o 815 self.assertTrue(mr() is None) 816 self.assertTrue(mr.called) 817 818 def test_subclass_refs_dont_replace_standard_refs(self): 819 class MyRef(weakref.ref): 820 pass 821 o = Object(42) 822 r1 = MyRef(o) 823 r2 = weakref.ref(o) 824 self.assertTrue(r1 is not r2) 825 self.assertEqual(weakref.getweakrefs(o), [r2, r1]) 826 self.assertEqual(weakref.getweakrefcount(o), 2) 827 r3 = MyRef(o) 828 self.assertEqual(weakref.getweakrefcount(o), 3) 829 refs = weakref.getweakrefs(o) 830 self.assertEqual(len(refs), 3) 831 self.assertTrue(r2 is refs[0]) 832 self.assertIn(r1, refs[1:]) 833 self.assertIn(r3, refs[1:]) 834 835 def test_subclass_refs_dont_conflate_callbacks(self): 836 class MyRef(weakref.ref): 837 pass 838 o = Object(42) 839 r1 = MyRef(o, id) 840 r2 = MyRef(o, str) 841 self.assertTrue(r1 is not r2) 842 refs = weakref.getweakrefs(o) 843 self.assertIn(r1, refs) 844 self.assertIn(r2, refs) 845 846 def test_subclass_refs_with_slots(self): 847 class MyRef(weakref.ref): 848 __slots__ = "slot1", "slot2" 849 def __new__(type, ob, callback, slot1, slot2): 850 return weakref.ref.__new__(type, ob, callback) 851 def __init__(self, ob, callback, slot1, slot2): 852 self.slot1 = slot1 853 self.slot2 = slot2 854 def meth(self): 855 return self.slot1 + self.slot2 856 o = Object(42) 857 r = MyRef(o, None, "abc", "def") 858 self.assertEqual(r.slot1, "abc") 859 self.assertEqual(r.slot2, "def") 860 self.assertEqual(r.meth(), "abcdef") 861 self.assertFalse(hasattr(r, "__dict__")) 862 863 def test_subclass_refs_with_cycle(self): 864 # Bug #3110 865 # An instance of a weakref subclass can have attributes. 866 # If such a weakref holds the only strong reference to the object, 867 # deleting the weakref will delete the object. In this case, 868 # the callback must not be called, because the ref object is 869 # being deleted. 870 class MyRef(weakref.ref): 871 pass 872 873 # Use a local callback, for "regrtest -R::" 874 # to detect refcounting problems 875 def callback(w): 876 self.cbcalled += 1 877 878 o = C() 879 r1 = MyRef(o, callback) 880 r1.o = o 881 del o 882 883 del r1 # Used to crash here 884 885 self.assertEqual(self.cbcalled, 0) 886 887 # Same test, with two weakrefs to the same object 888 # (since code paths are different) 889 o = C() 890 r1 = MyRef(o, callback) 891 r2 = MyRef(o, callback) 892 r1.r = r2 893 r2.o = o 894 del o 895 del r2 896 897 del r1 # Used to crash here 898 899 self.assertEqual(self.cbcalled, 0) 900 901 902 class MappingTestCase(TestBase): 903 904 COUNT = 10 905 906 def check_len_cycles(self, dict_type, cons): 907 N = 20 908 items = [RefCycle() for i in range(N)] 909 dct = dict_type(cons(o) for o in items) 910 # Keep an iterator alive 911 it = dct.iteritems() 912 try: 913 next(it) 914 except StopIteration: 915 pass 916 del items 917 gc.collect() 918 n1 = len(dct) 919 del it 920 gc.collect() 921 n2 = len(dct) 922 # one item may be kept alive inside the iterator 923 self.assertIn(n1, (0, 1)) 924 self.assertEqual(n2, 0) 925 926 def test_weak_keyed_len_cycles(self): 927 self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1)) 928 929 def test_weak_valued_len_cycles(self): 930 self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k)) 931 932 def check_len_race(self, dict_type, cons): 933 # Extended sanity checks for len() in the face of cyclic collection 934 self.addCleanup(gc.set_threshold, *gc.get_threshold()) 935 for th in range(1, 100): 936 N = 20 937 gc.collect(0) 938 gc.set_threshold(th, th, th) 939 items = [RefCycle() for i in range(N)] 940 dct = dict_type(cons(o) for o in items) 941 del items 942 # All items will be collected at next garbage collection pass 943 it = dct.iteritems() 944 try: 945 next(it) 946 except StopIteration: 947 pass 948 n1 = len(dct) 949 del it 950 n2 = len(dct) 951 self.assertGreaterEqual(n1, 0) 952 self.assertLessEqual(n1, N) 953 self.assertGreaterEqual(n2, 0) 954 self.assertLessEqual(n2, n1) 955 956 def test_weak_keyed_len_race(self): 957 self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1)) 958 959 def test_weak_valued_len_race(self): 960 self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k)) 961 962 def test_weak_values(self): 963 # 964 # This exercises d.copy(), d.items(), d[], del d[], len(d). 965 # 966 dict, objects = self.make_weak_valued_dict() 967 for o in objects: 968 self.assertTrue(weakref.getweakrefcount(o) == 1, 969 "wrong number of weak references to %r!" % o) 970 self.assertTrue(o is dict[o.arg], 971 "wrong object returned by weak dict!") 972 items1 = dict.items() 973 items2 = dict.copy().items() 974 items1.sort() 975 items2.sort() 976 self.assertTrue(items1 == items2, 977 "cloning of weak-valued dictionary did not work!") 978 del items1, items2 979 self.assertTrue(len(dict) == self.COUNT) 980 del objects[0] 981 self.assertTrue(len(dict) == (self.COUNT - 1), 982 "deleting object did not cause dictionary update") 983 del objects, o 984 self.assertTrue(len(dict) == 0, 985 "deleting the values did not clear the dictionary") 986 # regression on SF bug #447152: 987 dict = weakref.WeakValueDictionary() 988 self.assertRaises(KeyError, dict.__getitem__, 1) 989 dict[2] = C() 990 self.assertRaises(KeyError, dict.__getitem__, 2) 991 992 def test_weak_keys(self): 993 # 994 # This exercises d.copy(), d.items(), d[] = v, d[], del d[], 995 # len(d), in d. 996 # 997 dict, objects = self.make_weak_keyed_dict() 998 for o in objects: 999 self.assertTrue(weakref.getweakrefcount(o) == 1, 1000 "wrong number of weak references to %r!" % o) 1001 self.assertTrue(o.arg is dict[o], 1002 "wrong object returned by weak dict!") 1003 items1 = dict.items() 1004 items2 = dict.copy().items() 1005 self.assertTrue(set(items1) == set(items2), 1006 "cloning of weak-keyed dictionary did not work!") 1007 del items1, items2 1008 self.assertTrue(len(dict) == self.COUNT) 1009 del objects[0] 1010 self.assertTrue(len(dict) == (self.COUNT - 1), 1011 "deleting object did not cause dictionary update") 1012 del objects, o 1013 self.assertTrue(len(dict) == 0, 1014 "deleting the keys did not clear the dictionary") 1015 o = Object(42) 1016 dict[o] = "What is the meaning of the universe?" 1017 self.assertIn(o, dict) 1018 self.assertNotIn(34, dict) 1019 1020 def test_weak_keyed_iters(self): 1021 dict, objects = self.make_weak_keyed_dict() 1022 self.check_iters(dict) 1023 1024 # Test keyrefs() 1025 refs = dict.keyrefs() 1026 self.assertEqual(len(refs), len(objects)) 1027 objects2 = list(objects) 1028 for wr in refs: 1029 ob = wr() 1030 self.assertIn(ob, dict) 1031 self.assertEqual(ob.arg, dict[ob]) 1032 objects2.remove(ob) 1033 self.assertEqual(len(objects2), 0) 1034 1035 # Test iterkeyrefs() 1036 objects2 = list(objects) 1037 self.assertEqual(len(list(dict.iterkeyrefs())), len(objects)) 1038 for wr in dict.iterkeyrefs(): 1039 ob = wr() 1040 self.assertIn(ob, dict) 1041 self.assertEqual(ob.arg, dict[ob]) 1042 objects2.remove(ob) 1043 self.assertEqual(len(objects2), 0) 1044 1045 def test_weak_valued_iters(self): 1046 dict, objects = self.make_weak_valued_dict() 1047 self.check_iters(dict) 1048 1049 # Test valuerefs() 1050 refs = dict.valuerefs() 1051 self.assertEqual(len(refs), len(objects)) 1052 objects2 = list(objects) 1053 for wr in refs: 1054 ob = wr() 1055 self.assertEqual(ob, dict[ob.arg]) 1056 self.assertEqual(ob.arg, dict[ob.arg].arg) 1057 objects2.remove(ob) 1058 self.assertEqual(len(objects2), 0) 1059 1060 # Test itervaluerefs() 1061 objects2 = list(objects) 1062 self.assertEqual(len(list(dict.itervaluerefs())), len(objects)) 1063 for wr in dict.itervaluerefs(): 1064 ob = wr() 1065 self.assertEqual(ob, dict[ob.arg]) 1066 self.assertEqual(ob.arg, dict[ob.arg].arg) 1067 objects2.remove(ob) 1068 self.assertEqual(len(objects2), 0) 1069 1070 def check_iters(self, dict): 1071 # item iterator: 1072 items = dict.items() 1073 for item in dict.iteritems(): 1074 items.remove(item) 1075 self.assertTrue(len(items) == 0, "iteritems() did not touch all items") 1076 1077 # key iterator, via __iter__(): 1078 keys = dict.keys() 1079 for k in dict: 1080 keys.remove(k) 1081 self.assertTrue(len(keys) == 0, "__iter__() did not touch all keys") 1082 1083 # key iterator, via iterkeys(): 1084 keys = dict.keys() 1085 for k in dict.iterkeys(): 1086 keys.remove(k) 1087 self.assertTrue(len(keys) == 0, "iterkeys() did not touch all keys") 1088 1089 # value iterator: 1090 values = dict.values() 1091 for v in dict.itervalues(): 1092 values.remove(v) 1093 self.assertTrue(len(values) == 0, 1094 "itervalues() did not touch all values") 1095 1096 def test_make_weak_keyed_dict_from_dict(self): 1097 o = Object(3) 1098 dict = weakref.WeakKeyDictionary({o:364}) 1099 self.assertTrue(dict[o] == 364) 1100 1101 def test_make_weak_keyed_dict_from_weak_keyed_dict(self): 1102 o = Object(3) 1103 dict = weakref.WeakKeyDictionary({o:364}) 1104 dict2 = weakref.WeakKeyDictionary(dict) 1105 self.assertTrue(dict[o] == 364) 1106 1107 def make_weak_keyed_dict(self): 1108 dict = weakref.WeakKeyDictionary() 1109 objects = map(Object, range(self.COUNT)) 1110 for o in objects: 1111 dict[o] = o.arg 1112 return dict, objects 1113 1114 def make_weak_valued_dict(self): 1115 dict = weakref.WeakValueDictionary() 1116 objects = map(Object, range(self.COUNT)) 1117 for o in objects: 1118 dict[o.arg] = o 1119 return dict, objects 1120 1121 def check_popitem(self, klass, key1, value1, key2, value2): 1122 weakdict = klass() 1123 weakdict[key1] = value1 1124 weakdict[key2] = value2 1125 self.assertTrue(len(weakdict) == 2) 1126 k, v = weakdict.popitem() 1127 self.assertTrue(len(weakdict) == 1) 1128 if k is key1: 1129 self.assertTrue(v is value1) 1130 else: 1131 self.assertTrue(v is value2) 1132 k, v = weakdict.popitem() 1133 self.assertTrue(len(weakdict) == 0) 1134 if k is key1: 1135 self.assertTrue(v is value1) 1136 else: 1137 self.assertTrue(v is value2) 1138 1139 def test_weak_valued_dict_popitem(self): 1140 self.check_popitem(weakref.WeakValueDictionary, 1141 "key1", C(), "key2", C()) 1142 1143 def test_weak_keyed_dict_popitem(self): 1144 self.check_popitem(weakref.WeakKeyDictionary, 1145 C(), "value 1", C(), "value 2") 1146 1147 def check_setdefault(self, klass, key, value1, value2): 1148 self.assertTrue(value1 is not value2, 1149 "invalid test" 1150 " -- value parameters must be distinct objects") 1151 weakdict = klass() 1152 o = weakdict.setdefault(key, value1) 1153 self.assertIs(o, value1) 1154 self.assertIn(key, weakdict) 1155 self.assertIs(weakdict.get(key), value1) 1156 self.assertIs(weakdict[key], value1) 1157 1158 o = weakdict.setdefault(key, value2) 1159 self.assertIs(o, value1) 1160 self.assertIn(key, weakdict) 1161 self.assertIs(weakdict.get(key), value1) 1162 self.assertIs(weakdict[key], value1) 1163 1164 def test_weak_valued_dict_setdefault(self): 1165 self.check_setdefault(weakref.WeakValueDictionary, 1166 "key", C(), C()) 1167 1168 def test_weak_keyed_dict_setdefault(self): 1169 self.check_setdefault(weakref.WeakKeyDictionary, 1170 C(), "value 1", "value 2") 1171 1172 def check_update(self, klass, dict): 1173 # 1174 # This exercises d.update(), len(d), d.keys(), in d, 1175 # d.get(), d[]. 1176 # 1177 weakdict = klass() 1178 weakdict.update(dict) 1179 self.assertEqual(len(weakdict), len(dict)) 1180 for k in weakdict.keys(): 1181 self.assertIn(k, dict, 1182 "mysterious new key appeared in weak dict") 1183 v = dict.get(k) 1184 self.assertIs(v, weakdict[k]) 1185 self.assertIs(v, weakdict.get(k)) 1186 for k in dict.keys(): 1187 self.assertIn(k, weakdict, 1188 "original key disappeared in weak dict") 1189 v = dict[k] 1190 self.assertIs(v, weakdict[k]) 1191 self.assertIs(v, weakdict.get(k)) 1192 1193 def test_weak_valued_dict_update(self): 1194 self.check_update(weakref.WeakValueDictionary, 1195 {1: C(), 'a': C(), C(): C()}) 1196 1197 def test_weak_keyed_dict_update(self): 1198 self.check_update(weakref.WeakKeyDictionary, 1199 {C(): 1, C(): 2, C(): 3}) 1200 1201 def test_weak_keyed_delitem(self): 1202 d = weakref.WeakKeyDictionary() 1203 o1 = Object('1') 1204 o2 = Object('2') 1205 d[o1] = 'something' 1206 d[o2] = 'something' 1207 self.assertTrue(len(d) == 2) 1208 del d[o1] 1209 self.assertTrue(len(d) == 1) 1210 self.assertTrue(d.keys() == [o2]) 1211 1212 def test_weak_valued_delitem(self): 1213 d = weakref.WeakValueDictionary() 1214 o1 = Object('1') 1215 o2 = Object('2') 1216 d['something'] = o1 1217 d['something else'] = o2 1218 self.assertTrue(len(d) == 2) 1219 del d['something'] 1220 self.assertTrue(len(d) == 1) 1221 self.assertTrue(d.items() == [('something else', o2)]) 1222 1223 def test_weak_keyed_bad_delitem(self): 1224 d = weakref.WeakKeyDictionary() 1225 o = Object('1') 1226 # An attempt to delete an object that isn't there should raise 1227 # KeyError. It didn't before 2.3. 1228 self.assertRaises(KeyError, d.__delitem__, o) 1229 self.assertRaises(KeyError, d.__getitem__, o) 1230 1231 # If a key isn't of a weakly referencable type, __getitem__ and 1232 # __setitem__ raise TypeError. __delitem__ should too. 1233 self.assertRaises(TypeError, d.__delitem__, 13) 1234 self.assertRaises(TypeError, d.__getitem__, 13) 1235 self.assertRaises(TypeError, d.__setitem__, 13, 13) 1236 1237 def test_weak_keyed_cascading_deletes(self): 1238 # SF bug 742860. For some reason, before 2.3 __delitem__ iterated 1239 # over the keys via self.data.iterkeys(). If things vanished from 1240 # the dict during this (or got added), that caused a RuntimeError. 1241 1242 d = weakref.WeakKeyDictionary() 1243 mutate = False 1244 1245 class C(object): 1246 def __init__(self, i): 1247 self.value = i 1248 def __hash__(self): 1249 return hash(self.value) 1250 def __eq__(self, other): 1251 if mutate: 1252 # Side effect that mutates the dict, by removing the 1253 # last strong reference to a key. 1254 del objs[-1] 1255 return self.value == other.value 1256 1257 objs = [C(i) for i in range(4)] 1258 for o in objs: 1259 d[o] = o.value 1260 del o # now the only strong references to keys are in objs 1261 # Find the order in which iterkeys sees the keys. 1262 objs = d.keys() 1263 # Reverse it, so that the iteration implementation of __delitem__ 1264 # has to keep looping to find the first object we delete. 1265 objs.reverse() 1266 1267 # Turn on mutation in C.__eq__. The first time thru the loop, 1268 # under the iterkeys() business the first comparison will delete 1269 # the last item iterkeys() would see, and that causes a 1270 # RuntimeError: dictionary changed size during iteration 1271 # when the iterkeys() loop goes around to try comparing the next 1272 # key. After this was fixed, it just deletes the last object *our* 1273 # "for o in obj" loop would have gotten to. 1274 mutate = True 1275 count = 0 1276 for o in objs: 1277 count += 1 1278 del d[o] 1279 self.assertEqual(len(d), 0) 1280 self.assertEqual(count, 2) 1281 1282 from test import mapping_tests 1283 1284 class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1285 """Check that WeakValueDictionary conforms to the mapping protocol""" 1286 __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)} 1287 type2test = weakref.WeakValueDictionary 1288 def _reference(self): 1289 return self.__ref.copy() 1290 1291 class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1292 """Check that WeakKeyDictionary conforms to the mapping protocol""" 1293 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} 1294 type2test = weakref.WeakKeyDictionary 1295 def _reference(self): 1296 return self.__ref.copy() 1297 1298 libreftest = """ Doctest for examples in the library reference: weakref.rst 1299 1300 >>> import weakref 1301 >>> class Dict(dict): 1302 ... pass 1303 ... 1304 >>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable 1305 >>> r = weakref.ref(obj) 1306 >>> print r() is obj 1307 True 1308 1309 >>> import weakref 1310 >>> class Object: 1311 ... pass 1312 ... 1313 >>> o = Object() 1314 >>> r = weakref.ref(o) 1315 >>> o2 = r() 1316 >>> o is o2 1317 True 1318 >>> del o, o2 1319 >>> print r() 1320 None 1321 1322 >>> import weakref 1323 >>> class ExtendedRef(weakref.ref): 1324 ... def __init__(self, ob, callback=None, **annotations): 1325 ... super(ExtendedRef, self).__init__(ob, callback) 1326 ... self.__counter = 0 1327 ... for k, v in annotations.iteritems(): 1328 ... setattr(self, k, v) 1329 ... def __call__(self): 1330 ... '''Return a pair containing the referent and the number of 1331 ... times the reference has been called. 1332 ... ''' 1333 ... ob = super(ExtendedRef, self).__call__() 1334 ... if ob is not None: 1335 ... self.__counter += 1 1336 ... ob = (ob, self.__counter) 1337 ... return ob 1338 ... 1339 >>> class A: # not in docs from here, just testing the ExtendedRef 1340 ... pass 1341 ... 1342 >>> a = A() 1343 >>> r = ExtendedRef(a, foo=1, bar="baz") 1344 >>> r.foo 1345 1 1346 >>> r.bar 1347 'baz' 1348 >>> r()[1] 1349 1 1350 >>> r()[1] 1351 2 1352 >>> r()[0] is a 1353 True 1354 1355 1356 >>> import weakref 1357 >>> _id2obj_dict = weakref.WeakValueDictionary() 1358 >>> def remember(obj): 1359 ... oid = id(obj) 1360 ... _id2obj_dict[oid] = obj 1361 ... return oid 1362 ... 1363 >>> def id2obj(oid): 1364 ... return _id2obj_dict[oid] 1365 ... 1366 >>> a = A() # from here, just testing 1367 >>> a_id = remember(a) 1368 >>> id2obj(a_id) is a 1369 True 1370 >>> del a 1371 >>> try: 1372 ... id2obj(a_id) 1373 ... except KeyError: 1374 ... print 'OK' 1375 ... else: 1376 ... print 'WeakValueDictionary error' 1377 OK 1378 1379 """ 1380 1381 __test__ = {'libreftest' : libreftest} 1382 1383 def test_main(): 1384 test_support.run_unittest( 1385 ReferencesTestCase, 1386 MappingTestCase, 1387 WeakValueDictionaryTestCase, 1388 WeakKeyDictionaryTestCase, 1389 SubclassableWeakrefTestCase, 1390 ) 1391 test_support.run_doctest(sys.modules[__name__]) 1392 1393 1394 if __name__ == "__main__": 1395 test_main() 1396