Home | History | Annotate | Download | only in test
      1 import gc
      2 import sys
      3 import unittest
      4 import UserList
      5 import weakref
      6 import operator
      7 
      8 from test import test_support
      9 
     10 # Used in ReferencesTestCase.test_ref_created_during_del() .

     11 ref_from_del = None
     12 
     13 class C:
     14     def method(self):
     15         pass
     16 
     17 
     18 class Callable:
     19     bar = None
     20 
     21     def __call__(self, x):
     22         self.bar = x
     23 
     24 
     25 def create_function():
     26     def f(): pass
     27     return f
     28 
     29 def create_bound_method():
     30     return C().method
     31 
     32 def create_unbound_method():
     33     return C.method
     34 
     35 
     36 class TestBase(unittest.TestCase):
     37 
     38     def setUp(self):
     39         self.cbcalled = 0
     40 
     41     def callback(self, ref):
     42         self.cbcalled += 1
     43 
     44 
     45 class ReferencesTestCase(TestBase):
     46 
     47     def test_basic_ref(self):
     48         self.check_basic_ref(C)
     49         self.check_basic_ref(create_function)
     50         self.check_basic_ref(create_bound_method)
     51         self.check_basic_ref(create_unbound_method)
     52 
     53         # Just make sure the tp_repr handler doesn't raise an exception.

     54         # Live reference:

     55         o = C()
     56         wr = weakref.ref(o)
     57         repr(wr)
     58         # Dead reference:

     59         del o
     60         repr(wr)
     61 
     62     def test_basic_callback(self):
     63         self.check_basic_callback(C)
     64         self.check_basic_callback(create_function)
     65         self.check_basic_callback(create_bound_method)
     66         self.check_basic_callback(create_unbound_method)
     67 
     68     def test_multiple_callbacks(self):
     69         o = C()
     70         ref1 = weakref.ref(o, self.callback)
     71         ref2 = weakref.ref(o, self.callback)
     72         del o
     73         self.assertTrue(ref1() is None,
     74                      "expected reference to be invalidated")
     75         self.assertTrue(ref2() is None,
     76                      "expected reference to be invalidated")
     77         self.assertTrue(self.cbcalled == 2,
     78                      "callback not called the right number of times")
     79 
     80     def test_multiple_selfref_callbacks(self):
     81         # Make sure all references are invalidated before callbacks are called

     82         #

     83         # What's important here is that we're using the first

     84         # reference in the callback invoked on the second reference

     85         # (the most recently created ref is cleaned up first).  This

     86         # tests that all references to the object are invalidated

     87         # before any of the callbacks are invoked, so that we only

     88         # have one invocation of _weakref.c:cleanup_helper() active

     89         # for a particular object at a time.

     90         #

     91         def callback(object, self=self):
     92             self.ref()
     93         c = C()
     94         self.ref = weakref.ref(c, callback)
     95         ref1 = weakref.ref(c, callback)
     96         del c
     97 
     98     def test_proxy_ref(self):
     99         o = C()
    100         o.bar = 1
    101         ref1 = weakref.proxy(o, self.callback)
    102         ref2 = weakref.proxy(o, self.callback)
    103         del o
    104 
    105         def check(proxy):
    106             proxy.bar
    107 
    108         self.assertRaises(weakref.ReferenceError, check, ref1)
    109         self.assertRaises(weakref.ReferenceError, check, ref2)
    110         self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C()))
    111         self.assertTrue(self.cbcalled == 2)
    112 
    113     def check_basic_ref(self, factory):
    114         o = factory()
    115         ref = weakref.ref(o)
    116         self.assertTrue(ref() is not None,
    117                      "weak reference to live object should be live")
    118         o2 = ref()
    119         self.assertTrue(o is o2,
    120                      "<ref>() should return original object if live")
    121 
    122     def check_basic_callback(self, factory):
    123         self.cbcalled = 0
    124         o = factory()
    125         ref = weakref.ref(o, self.callback)
    126         del o
    127         self.assertTrue(self.cbcalled == 1,
    128                      "callback did not properly set 'cbcalled'")
    129         self.assertTrue(ref() is None,
    130                      "ref2 should be dead after deleting object reference")
    131 
    132     def test_ref_reuse(self):
    133         o = C()
    134         ref1 = weakref.ref(o)
    135         # create a proxy to make sure that there's an intervening creation

    136         # between these two; it should make no difference

    137         proxy = weakref.proxy(o)
    138         ref2 = weakref.ref(o)
    139         self.assertTrue(ref1 is ref2,
    140                      "reference object w/out callback should be re-used")
    141 
    142         o = C()
    143         proxy = weakref.proxy(o)
    144         ref1 = weakref.ref(o)
    145         ref2 = weakref.ref(o)
    146         self.assertTrue(ref1 is ref2,
    147                      "reference object w/out callback should be re-used")
    148         self.assertTrue(weakref.getweakrefcount(o) == 2,
    149                      "wrong weak ref count for object")
    150         del proxy
    151         self.assertTrue(weakref.getweakrefcount(o) == 1,
    152                      "wrong weak ref count for object after deleting proxy")
    153 
    154     def test_proxy_reuse(self):
    155         o = C()
    156         proxy1 = weakref.proxy(o)
    157         ref = weakref.ref(o)
    158         proxy2 = weakref.proxy(o)
    159         self.assertTrue(proxy1 is proxy2,
    160                      "proxy object w/out callback should have been re-used")
    161 
    162     def test_basic_proxy(self):
    163         o = C()
    164         self.check_proxy(o, weakref.proxy(o))
    165 
    166         L = UserList.UserList()
    167         p = weakref.proxy(L)
    168         self.assertFalse(p, "proxy for empty UserList should be false")
    169         p.append(12)
    170         self.assertEqual(len(L), 1)
    171         self.assertTrue(p, "proxy for non-empty UserList should be true")
    172         with test_support.check_py3k_warnings():
    173             p[:] = [2, 3]
    174         self.assertEqual(len(L), 2)
    175         self.assertEqual(len(p), 2)
    176         self.assertIn(3, p, "proxy didn't support __contains__() properly")
    177         p[1] = 5
    178         self.assertEqual(L[1], 5)
    179         self.assertEqual(p[1], 5)
    180         L2 = UserList.UserList(L)
    181         p2 = weakref.proxy(L2)
    182         self.assertEqual(p, p2)
    183         ## self.assertEqual(repr(L2), repr(p2))

    184         L3 = UserList.UserList(range(10))
    185         p3 = weakref.proxy(L3)
    186         with test_support.check_py3k_warnings():
    187             self.assertEqual(L3[:], p3[:])
    188             self.assertEqual(L3[5:], p3[5:])
    189             self.assertEqual(L3[:5], p3[:5])
    190             self.assertEqual(L3[2:5], p3[2:5])
    191 
    192     def test_proxy_unicode(self):
    193         # See bug 5037

    194         class C(object):
    195             def __str__(self):
    196                 return "string"
    197             def __unicode__(self):
    198                 return u"unicode"
    199         instance = C()
    200         self.assertIn("__unicode__", dir(weakref.proxy(instance)))
    201         self.assertEqual(unicode(weakref.proxy(instance)), u"unicode")
    202 
    203     def test_proxy_index(self):
    204         class C:
    205             def __index__(self):
    206                 return 10
    207         o = C()
    208         p = weakref.proxy(o)
    209         self.assertEqual(operator.index(p), 10)
    210 
    211     def test_proxy_div(self):
    212         class C:
    213             def __floordiv__(self, other):
    214                 return 42
    215             def __ifloordiv__(self, other):
    216                 return 21
    217         o = C()
    218         p = weakref.proxy(o)
    219         self.assertEqual(p // 5, 42)
    220         p //= 5
    221         self.assertEqual(p, 21)
    222 
    223     # The PyWeakref_* C API is documented as allowing either NULL or

    224     # None as the value for the callback, where either means "no

    225     # callback".  The "no callback" ref and proxy objects are supposed

    226     # to be shared so long as they exist by all callers so long as

    227     # they are active.  In Python 2.3.3 and earlier, this guarantee

    228     # was not honored, and was broken in different ways for

    229     # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)

    230 
    231     def test_shared_ref_without_callback(self):
    232         self.check_shared_without_callback(weakref.ref)
    233 
    234     def test_shared_proxy_without_callback(self):
    235         self.check_shared_without_callback(weakref.proxy)
    236 
    237     def check_shared_without_callback(self, makeref):
    238         o = Object(1)
    239         p1 = makeref(o, None)
    240         p2 = makeref(o, None)
    241         self.assertTrue(p1 is p2, "both callbacks were None in the C API")
    242         del p1, p2
    243         p1 = makeref(o)
    244         p2 = makeref(o, None)
    245         self.assertTrue(p1 is p2, "callbacks were NULL, None in the C API")
    246         del p1, p2
    247         p1 = makeref(o)
    248         p2 = makeref(o)
    249         self.assertTrue(p1 is p2, "both callbacks were NULL in the C API")
    250         del p1, p2
    251         p1 = makeref(o, None)
    252         p2 = makeref(o)
    253         self.assertTrue(p1 is p2, "callbacks were None, NULL in the C API")
    254 
    255     def test_callable_proxy(self):
    256         o = Callable()
    257         ref1 = weakref.proxy(o)
    258 
    259         self.check_proxy(o, ref1)
    260 
    261         self.assertTrue(type(ref1) is weakref.CallableProxyType,
    262                      "proxy is not of callable type")
    263         ref1('twinkies!')
    264         self.assertTrue(o.bar == 'twinkies!',
    265                      "call through proxy not passed through to original")
    266         ref1(x='Splat.')
    267         self.assertTrue(o.bar == 'Splat.',
    268                      "call through proxy not passed through to original")
    269 
    270         # expect due to too few args

    271         self.assertRaises(TypeError, ref1)
    272 
    273         # expect due to too many args

    274         self.assertRaises(TypeError, ref1, 1, 2, 3)
    275 
    276     def check_proxy(self, o, proxy):
    277         o.foo = 1
    278         self.assertTrue(proxy.foo == 1,
    279                      "proxy does not reflect attribute addition")
    280         o.foo = 2
    281         self.assertTrue(proxy.foo == 2,
    282                      "proxy does not reflect attribute modification")
    283         del o.foo
    284         self.assertTrue(not hasattr(proxy, 'foo'),
    285                      "proxy does not reflect attribute removal")
    286 
    287         proxy.foo = 1
    288         self.assertTrue(o.foo == 1,
    289                      "object does not reflect attribute addition via proxy")
    290         proxy.foo = 2
    291         self.assertTrue(
    292             o.foo == 2,
    293             "object does not reflect attribute modification via proxy")
    294         del proxy.foo
    295         self.assertTrue(not hasattr(o, 'foo'),
    296                      "object does not reflect attribute removal via proxy")
    297 
    298     def test_proxy_deletion(self):
    299         # Test clearing of SF bug #762891

    300         class Foo:
    301             result = None
    302             def __delitem__(self, accessor):
    303                 self.result = accessor
    304         g = Foo()
    305         f = weakref.proxy(g)
    306         del f[0]
    307         self.assertEqual(f.result, 0)
    308 
    309     def test_proxy_bool(self):
    310         # Test clearing of SF bug #1170766

    311         class List(list): pass
    312         lyst = List()
    313         self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
    314 
    315     def test_getweakrefcount(self):
    316         o = C()
    317         ref1 = weakref.ref(o)
    318         ref2 = weakref.ref(o, self.callback)
    319         self.assertTrue(weakref.getweakrefcount(o) == 2,
    320                      "got wrong number of weak reference objects")
    321 
    322         proxy1 = weakref.proxy(o)
    323         proxy2 = weakref.proxy(o, self.callback)
    324         self.assertTrue(weakref.getweakrefcount(o) == 4,
    325                      "got wrong number of weak reference objects")
    326 
    327         del ref1, ref2, proxy1, proxy2
    328         self.assertTrue(weakref.getweakrefcount(o) == 0,
    329                      "weak reference objects not unlinked from"
    330                      " referent when discarded.")
    331 
    332         # assumes ints do not support weakrefs

    333         self.assertTrue(weakref.getweakrefcount(1) == 0,
    334                      "got wrong number of weak reference objects for int")
    335 
    336     def test_getweakrefs(self):
    337         o = C()
    338         ref1 = weakref.ref(o, self.callback)
    339         ref2 = weakref.ref(o, self.callback)
    340         del ref1
    341         self.assertTrue(weakref.getweakrefs(o) == [ref2],
    342                      "list of refs does not match")
    343 
    344         o = C()
    345         ref1 = weakref.ref(o, self.callback)
    346         ref2 = weakref.ref(o, self.callback)
    347         del ref2
    348         self.assertTrue(weakref.getweakrefs(o) == [ref1],
    349                      "list of refs does not match")
    350 
    351         del ref1
    352         self.assertTrue(weakref.getweakrefs(o) == [],
    353                      "list of refs not cleared")
    354 
    355         # assumes ints do not support weakrefs

    356         self.assertTrue(weakref.getweakrefs(1) == [],
    357                      "list of refs does not match for int")
    358 
    359     def test_newstyle_number_ops(self):
    360         class F(float):
    361             pass
    362         f = F(2.0)
    363         p = weakref.proxy(f)
    364         self.assertTrue(p + 1.0 == 3.0)
    365         self.assertTrue(1.0 + p == 3.0)  # this used to SEGV

    366 
    367     def test_callbacks_protected(self):
    368         # Callbacks protected from already-set exceptions?

    369         # Regression test for SF bug #478534.

    370         class BogusError(Exception):
    371             pass
    372         data = {}
    373         def remove(k):
    374             del data[k]
    375         def encapsulate():
    376             f = lambda : ()
    377             data[weakref.ref(f, remove)] = None
    378             raise BogusError
    379         try:
    380             encapsulate()
    381         except BogusError:
    382             pass
    383         else:
    384             self.fail("exception not properly restored")
    385         try:
    386             encapsulate()
    387         except BogusError:
    388             pass
    389         else:
    390             self.fail("exception not properly restored")
    391 
    392     def test_sf_bug_840829(self):
    393         # "weakref callbacks and gc corrupt memory"

    394         # subtype_dealloc erroneously exposed a new-style instance

    395         # already in the process of getting deallocated to gc,

    396         # causing double-deallocation if the instance had a weakref

    397         # callback that triggered gc.

    398         # If the bug exists, there probably won't be an obvious symptom

    399         # in a release build.  In a debug build, a segfault will occur

    400         # when the second attempt to remove the instance from the "list

    401         # of all objects" occurs.

    402 
    403         import gc
    404 
    405         class C(object):
    406             pass
    407 
    408         c = C()
    409         wr = weakref.ref(c, lambda ignore: gc.collect())
    410         del c
    411 
    412         # There endeth the first part.  It gets worse.

    413         del wr
    414 
    415         c1 = C()
    416         c1.i = C()
    417         wr = weakref.ref(c1.i, lambda ignore: gc.collect())
    418 
    419         c2 = C()
    420         c2.c1 = c1
    421         del c1  # still alive because c2 points to it

    422 
    423         # Now when subtype_dealloc gets called on c2, it's not enough just

    424         # that c2 is immune from gc while the weakref callbacks associated

    425         # with c2 execute (there are none in this 2nd half of the test, btw).

    426         # subtype_dealloc goes on to call the base classes' deallocs too,

    427         # so any gc triggered by weakref callbacks associated with anything

    428         # torn down by a base class dealloc can also trigger double

    429         # deallocation of c2.

    430         del c2
    431 
    432     def test_callback_in_cycle_1(self):
    433         import gc
    434 
    435         class J(object):
    436             pass
    437 
    438         class II(object):
    439             def acallback(self, ignore):
    440                 self.J
    441 
    442         I = II()
    443         I.J = J
    444         I.wr = weakref.ref(J, I.acallback)
    445 
    446         # Now J and II are each in a self-cycle (as all new-style class

    447         # objects are, since their __mro__ points back to them).  I holds

    448         # both a weak reference (I.wr) and a strong reference (I.J) to class

    449         # J.  I is also in a cycle (I.wr points to a weakref that references

    450         # I.acallback).  When we del these three, they all become trash, but

    451         # the cycles prevent any of them from getting cleaned up immediately.

    452         # Instead they have to wait for cyclic gc to deduce that they're

    453         # trash.

    454         #

    455         # gc used to call tp_clear on all of them, and the order in which

    456         # it does that is pretty accidental.  The exact order in which we

    457         # built up these things manages to provoke gc into running tp_clear

    458         # in just the right order (I last).  Calling tp_clear on II leaves

    459         # behind an insane class object (its __mro__ becomes NULL).  Calling

    460         # tp_clear on J breaks its self-cycle, but J doesn't get deleted

    461         # just then because of the strong reference from I.J.  Calling

    462         # tp_clear on I starts to clear I's __dict__, and just happens to

    463         # clear I.J first -- I.wr is still intact.  That removes the last

    464         # reference to J, which triggers the weakref callback.  The callback

    465         # tries to do "self.J", and instances of new-style classes look up

    466         # attributes ("J") in the class dict first.  The class (II) wants to

    467         # search II.__mro__, but that's NULL.   The result was a segfault in

    468         # a release build, and an assert failure in a debug build.

    469         del I, J, II
    470         gc.collect()
    471 
    472     def test_callback_in_cycle_2(self):
    473         import gc
    474 
    475         # This is just like test_callback_in_cycle_1, except that II is an

    476         # old-style class.  The symptom is different then:  an instance of an

    477         # old-style class looks in its own __dict__ first.  'J' happens to

    478         # get cleared from I.__dict__ before 'wr', and 'J' was never in II's

    479         # __dict__, so the attribute isn't found.  The difference is that

    480         # the old-style II doesn't have a NULL __mro__ (it doesn't have any

    481         # __mro__), so no segfault occurs.  Instead it got:

    482         #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...

    483         #    Exception exceptions.AttributeError:

    484         #   "II instance has no attribute 'J'" in <bound method II.acallback

    485         #       of <?.II instance at 0x00B9B4B8>> ignored

    486 
    487         class J(object):
    488             pass
    489 
    490         class II:
    491             def acallback(self, ignore):
    492                 self.J
    493 
    494         I = II()
    495         I.J = J
    496         I.wr = weakref.ref(J, I.acallback)
    497 
    498         del I, J, II
    499         gc.collect()
    500 
    501     def test_callback_in_cycle_3(self):
    502         import gc
    503 
    504         # This one broke the first patch that fixed the last two.  In this

    505         # case, the objects reachable from the callback aren't also reachable

    506         # from the object (c1) *triggering* the callback:  you can get to

    507         # c1 from c2, but not vice-versa.  The result was that c2's __dict__

    508         # got tp_clear'ed by the time the c2.cb callback got invoked.

    509 
    510         class C:
    511             def cb(self, ignore):
    512                 self.me
    513                 self.c1
    514                 self.wr
    515 
    516         c1, c2 = C(), C()
    517 
    518         c2.me = c2
    519         c2.c1 = c1
    520         c2.wr = weakref.ref(c1, c2.cb)
    521 
    522         del c1, c2
    523         gc.collect()
    524 
    525     def test_callback_in_cycle_4(self):
    526         import gc
    527 
    528         # Like test_callback_in_cycle_3, except c2 and c1 have different

    529         # classes.  c2's class (C) isn't reachable from c1 then, so protecting

    530         # objects reachable from the dying object (c1) isn't enough to stop

    531         # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.

    532         # The result was a segfault (C.__mro__ was NULL when the callback

    533         # tried to look up self.me).

    534 
    535         class C(object):
    536             def cb(self, ignore):
    537                 self.me
    538                 self.c1
    539                 self.wr
    540 
    541         class D:
    542             pass
    543 
    544         c1, c2 = D(), C()
    545 
    546         c2.me = c2
    547         c2.c1 = c1
    548         c2.wr = weakref.ref(c1, c2.cb)
    549 
    550         del c1, c2, C, D
    551         gc.collect()
    552 
    553     def test_callback_in_cycle_resurrection(self):
    554         import gc
    555 
    556         # Do something nasty in a weakref callback:  resurrect objects

    557         # from dead cycles.  For this to be attempted, the weakref and

    558         # its callback must also be part of the cyclic trash (else the

    559         # objects reachable via the callback couldn't be in cyclic trash

    560         # to begin with -- the callback would act like an external root).

    561         # But gc clears trash weakrefs with callbacks early now, which

    562         # disables the callbacks, so the callbacks shouldn't get called

    563         # at all (and so nothing actually gets resurrected).

    564 
    565         alist = []
    566         class C(object):
    567             def __init__(self, value):
    568                 self.attribute = value
    569 
    570             def acallback(self, ignore):
    571                 alist.append(self.c)
    572 
    573         c1, c2 = C(1), C(2)
    574         c1.c = c2
    575         c2.c = c1
    576         c1.wr = weakref.ref(c2, c1.acallback)
    577         c2.wr = weakref.ref(c1, c2.acallback)
    578 
    579         def C_went_away(ignore):
    580             alist.append("C went away")
    581         wr = weakref.ref(C, C_went_away)
    582 
    583         del c1, c2, C   # make them all trash

    584         self.assertEqual(alist, [])  # del isn't enough to reclaim anything

    585 
    586         gc.collect()
    587         # c1.wr and c2.wr were part of the cyclic trash, so should have

    588         # been cleared without their callbacks executing.  OTOH, the weakref

    589         # to C is bound to a function local (wr), and wasn't trash, so that

    590         # callback should have been invoked when C went away.

    591         self.assertEqual(alist, ["C went away"])
    592         # The remaining weakref should be dead now (its callback ran).

    593         self.assertEqual(wr(), None)
    594 
    595         del alist[:]
    596         gc.collect()
    597         self.assertEqual(alist, [])
    598 
    599     def test_callbacks_on_callback(self):
    600         import gc
    601 
    602         # Set up weakref callbacks *on* weakref callbacks.

    603         alist = []
    604         def safe_callback(ignore):
    605             alist.append("safe_callback called")
    606 
    607         class C(object):
    608             def cb(self, ignore):
    609                 alist.append("cb called")
    610 
    611         c, d = C(), C()
    612         c.other = d
    613         d.other = c
    614         callback = c.cb
    615         c.wr = weakref.ref(d, callback)     # this won't trigger

    616         d.wr = weakref.ref(callback, d.cb)  # ditto

    617         external_wr = weakref.ref(callback, safe_callback)  # but this will

    618         self.assertTrue(external_wr() is callback)
    619 
    620         # The weakrefs attached to c and d should get cleared, so that

    621         # C.cb is never called.  But external_wr isn't part of the cyclic

    622         # trash, and no cyclic trash is reachable from it, so safe_callback

    623         # should get invoked when the bound method object callback (c.cb)

    624         # -- which is itself a callback, and also part of the cyclic trash --

    625         # gets reclaimed at the end of gc.

    626 
    627         del callback, c, d, C
    628         self.assertEqual(alist, [])  # del isn't enough to clean up cycles

    629         gc.collect()
    630         self.assertEqual(alist, ["safe_callback called"])
    631         self.assertEqual(external_wr(), None)
    632 
    633         del alist[:]
    634         gc.collect()
    635         self.assertEqual(alist, [])
    636 
    637     def test_gc_during_ref_creation(self):
    638         self.check_gc_during_creation(weakref.ref)
    639 
    640     def test_gc_during_proxy_creation(self):
    641         self.check_gc_during_creation(weakref.proxy)
    642 
    643     def check_gc_during_creation(self, makeref):
    644         thresholds = gc.get_threshold()
    645         gc.set_threshold(1, 1, 1)
    646         gc.collect()
    647         class A:
    648             pass
    649 
    650         def callback(*args):
    651             pass
    652 
    653         referenced = A()
    654 
    655         a = A()
    656         a.a = a
    657         a.wr = makeref(referenced)
    658 
    659         try:
    660             # now make sure the object and the ref get labeled as

    661             # cyclic trash:

    662             a = A()
    663             weakref.ref(referenced, callback)
    664 
    665         finally:
    666             gc.set_threshold(*thresholds)
    667 
    668     def test_ref_created_during_del(self):
    669         # Bug #1377858

    670         # A weakref created in an object's __del__() would crash the

    671         # interpreter when the weakref was cleaned up since it would refer to

    672         # non-existent memory.  This test should not segfault the interpreter.

    673         class Target(object):
    674             def __del__(self):
    675                 global ref_from_del
    676                 ref_from_del = weakref.ref(self)
    677 
    678         w = Target()
    679 
    680     def test_init(self):
    681         # Issue 3634

    682         # <weakref to class>.__init__() doesn't check errors correctly

    683         r = weakref.ref(Exception)
    684         self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
    685         # No exception should be raised here

    686         gc.collect()
    687 
    688     def test_classes(self):
    689         # Check that both old-style classes and new-style classes

    690         # are weakrefable.

    691         class A(object):
    692             pass
    693         class B:
    694             pass
    695         l = []
    696         weakref.ref(int)
    697         a = weakref.ref(A, l.append)
    698         A = None
    699         gc.collect()
    700         self.assertEqual(a(), None)
    701         self.assertEqual(l, [a])
    702         b = weakref.ref(B, l.append)
    703         B = None
    704         gc.collect()
    705         self.assertEqual(b(), None)
    706         self.assertEqual(l, [a, b])
    707 
    708 
    709 class SubclassableWeakrefTestCase(TestBase):
    710 
    711     def test_subclass_refs(self):
    712         class MyRef(weakref.ref):
    713             def __init__(self, ob, callback=None, value=42):
    714                 self.value = value
    715                 super(MyRef, self).__init__(ob, callback)
    716             def __call__(self):
    717                 self.called = True
    718                 return super(MyRef, self).__call__()
    719         o = Object("foo")
    720         mr = MyRef(o, value=24)
    721         self.assertTrue(mr() is o)
    722         self.assertTrue(mr.called)
    723         self.assertEqual(mr.value, 24)
    724         del o
    725         self.assertTrue(mr() is None)
    726         self.assertTrue(mr.called)
    727 
    728     def test_subclass_refs_dont_replace_standard_refs(self):
    729         class MyRef(weakref.ref):
    730             pass
    731         o = Object(42)
    732         r1 = MyRef(o)
    733         r2 = weakref.ref(o)
    734         self.assertTrue(r1 is not r2)
    735         self.assertEqual(weakref.getweakrefs(o), [r2, r1])
    736         self.assertEqual(weakref.getweakrefcount(o), 2)
    737         r3 = MyRef(o)
    738         self.assertEqual(weakref.getweakrefcount(o), 3)
    739         refs = weakref.getweakrefs(o)
    740         self.assertEqual(len(refs), 3)
    741         self.assertTrue(r2 is refs[0])
    742         self.assertIn(r1, refs[1:])
    743         self.assertIn(r3, refs[1:])
    744 
    745     def test_subclass_refs_dont_conflate_callbacks(self):
    746         class MyRef(weakref.ref):
    747             pass
    748         o = Object(42)
    749         r1 = MyRef(o, id)
    750         r2 = MyRef(o, str)
    751         self.assertTrue(r1 is not r2)
    752         refs = weakref.getweakrefs(o)
    753         self.assertIn(r1, refs)
    754         self.assertIn(r2, refs)
    755 
    756     def test_subclass_refs_with_slots(self):
    757         class MyRef(weakref.ref):
    758             __slots__ = "slot1", "slot2"
    759             def __new__(type, ob, callback, slot1, slot2):
    760                 return weakref.ref.__new__(type, ob, callback)
    761             def __init__(self, ob, callback, slot1, slot2):
    762                 self.slot1 = slot1
    763                 self.slot2 = slot2
    764             def meth(self):
    765                 return self.slot1 + self.slot2
    766         o = Object(42)
    767         r = MyRef(o, None, "abc", "def")
    768         self.assertEqual(r.slot1, "abc")
    769         self.assertEqual(r.slot2, "def")
    770         self.assertEqual(r.meth(), "abcdef")
    771         self.assertFalse(hasattr(r, "__dict__"))
    772 
    773     def test_subclass_refs_with_cycle(self):
    774         # Bug #3110

    775         # An instance of a weakref subclass can have attributes.

    776         # If such a weakref holds the only strong reference to the object,

    777         # deleting the weakref will delete the object. In this case,

    778         # the callback must not be called, because the ref object is

    779         # being deleted.

    780         class MyRef(weakref.ref):
    781             pass
    782 
    783         # Use a local callback, for "regrtest -R::"

    784         # to detect refcounting problems

    785         def callback(w):
    786             self.cbcalled += 1
    787 
    788         o = C()
    789         r1 = MyRef(o, callback)
    790         r1.o = o
    791         del o
    792 
    793         del r1 # Used to crash here

    794 
    795         self.assertEqual(self.cbcalled, 0)
    796 
    797         # Same test, with two weakrefs to the same object

    798         # (since code paths are different)

    799         o = C()
    800         r1 = MyRef(o, callback)
    801         r2 = MyRef(o, callback)
    802         r1.r = r2
    803         r2.o = o
    804         del o
    805         del r2
    806 
    807         del r1 # Used to crash here

    808 
    809         self.assertEqual(self.cbcalled, 0)
    810 
    811 
    812 class Object:
    813     def __init__(self, arg):
    814         self.arg = arg
    815     def __repr__(self):
    816         return "<Object %r>" % self.arg
    817 
    818 
    819 class MappingTestCase(TestBase):
    820 
    821     COUNT = 10
    822 
    823     def test_weak_values(self):
    824         #

    825         #  This exercises d.copy(), d.items(), d[], del d[], len(d).

    826         #

    827         dict, objects = self.make_weak_valued_dict()
    828         for o in objects:
    829             self.assertTrue(weakref.getweakrefcount(o) == 1,
    830                          "wrong number of weak references to %r!" % o)
    831             self.assertTrue(o is dict[o.arg],
    832                          "wrong object returned by weak dict!")
    833         items1 = dict.items()
    834         items2 = dict.copy().items()
    835         items1.sort()
    836         items2.sort()
    837         self.assertTrue(items1 == items2,
    838                      "cloning of weak-valued dictionary did not work!")
    839         del items1, items2
    840         self.assertTrue(len(dict) == self.COUNT)
    841         del objects[0]
    842         self.assertTrue(len(dict) == (self.COUNT - 1),
    843                      "deleting object did not cause dictionary update")
    844         del objects, o
    845         self.assertTrue(len(dict) == 0,
    846                      "deleting the values did not clear the dictionary")
    847         # regression on SF bug #447152:

    848         dict = weakref.WeakValueDictionary()
    849         self.assertRaises(KeyError, dict.__getitem__, 1)
    850         dict[2] = C()
    851         self.assertRaises(KeyError, dict.__getitem__, 2)
    852 
    853     def test_weak_keys(self):
    854         #

    855         #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],

    856         #  len(d), in d.

    857         #

    858         dict, objects = self.make_weak_keyed_dict()
    859         for o in objects:
    860             self.assertTrue(weakref.getweakrefcount(o) == 1,
    861                          "wrong number of weak references to %r!" % o)
    862             self.assertTrue(o.arg is dict[o],
    863                          "wrong object returned by weak dict!")
    864         items1 = dict.items()
    865         items2 = dict.copy().items()
    866         self.assertTrue(set(items1) == set(items2),
    867                      "cloning of weak-keyed dictionary did not work!")
    868         del items1, items2
    869         self.assertTrue(len(dict) == self.COUNT)
    870         del objects[0]
    871         self.assertTrue(len(dict) == (self.COUNT - 1),
    872                      "deleting object did not cause dictionary update")
    873         del objects, o
    874         self.assertTrue(len(dict) == 0,
    875                      "deleting the keys did not clear the dictionary")
    876         o = Object(42)
    877         dict[o] = "What is the meaning of the universe?"
    878         self.assertIn(o, dict)
    879         self.assertNotIn(34, dict)
    880 
    881     def test_weak_keyed_iters(self):
    882         dict, objects = self.make_weak_keyed_dict()
    883         self.check_iters(dict)
    884 
    885         # Test keyrefs()

    886         refs = dict.keyrefs()
    887         self.assertEqual(len(refs), len(objects))
    888         objects2 = list(objects)
    889         for wr in refs:
    890             ob = wr()
    891             self.assertIn(ob, dict)
    892             self.assertEqual(ob.arg, dict[ob])
    893             objects2.remove(ob)
    894         self.assertEqual(len(objects2), 0)
    895 
    896         # Test iterkeyrefs()

    897         objects2 = list(objects)
    898         self.assertEqual(len(list(dict.iterkeyrefs())), len(objects))
    899         for wr in dict.iterkeyrefs():
    900             ob = wr()
    901             self.assertIn(ob, dict)
    902             self.assertEqual(ob.arg, dict[ob])
    903             objects2.remove(ob)
    904         self.assertEqual(len(objects2), 0)
    905 
    906     def test_weak_valued_iters(self):
    907         dict, objects = self.make_weak_valued_dict()
    908         self.check_iters(dict)
    909 
    910         # Test valuerefs()

    911         refs = dict.valuerefs()
    912         self.assertEqual(len(refs), len(objects))
    913         objects2 = list(objects)
    914         for wr in refs:
    915             ob = wr()
    916             self.assertEqual(ob, dict[ob.arg])
    917             self.assertEqual(ob.arg, dict[ob.arg].arg)
    918             objects2.remove(ob)
    919         self.assertEqual(len(objects2), 0)
    920 
    921         # Test itervaluerefs()

    922         objects2 = list(objects)
    923         self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
    924         for wr in dict.itervaluerefs():
    925             ob = wr()
    926             self.assertEqual(ob, dict[ob.arg])
    927             self.assertEqual(ob.arg, dict[ob.arg].arg)
    928             objects2.remove(ob)
    929         self.assertEqual(len(objects2), 0)
    930 
    931     def check_iters(self, dict):
    932         # item iterator:

    933         items = dict.items()
    934         for item in dict.iteritems():
    935             items.remove(item)
    936         self.assertTrue(len(items) == 0, "iteritems() did not touch all items")
    937 
    938         # key iterator, via __iter__():

    939         keys = dict.keys()
    940         for k in dict:
    941             keys.remove(k)
    942         self.assertTrue(len(keys) == 0, "__iter__() did not touch all keys")
    943 
    944         # key iterator, via iterkeys():

    945         keys = dict.keys()
    946         for k in dict.iterkeys():
    947             keys.remove(k)
    948         self.assertTrue(len(keys) == 0, "iterkeys() did not touch all keys")
    949 
    950         # value iterator:

    951         values = dict.values()
    952         for v in dict.itervalues():
    953             values.remove(v)
    954         self.assertTrue(len(values) == 0,
    955                      "itervalues() did not touch all values")
    956 
    957     def test_make_weak_keyed_dict_from_dict(self):
    958         o = Object(3)
    959         dict = weakref.WeakKeyDictionary({o:364})
    960         self.assertTrue(dict[o] == 364)
    961 
    962     def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
    963         o = Object(3)
    964         dict = weakref.WeakKeyDictionary({o:364})
    965         dict2 = weakref.WeakKeyDictionary(dict)
    966         self.assertTrue(dict[o] == 364)
    967 
    968     def make_weak_keyed_dict(self):
    969         dict = weakref.WeakKeyDictionary()
    970         objects = map(Object, range(self.COUNT))
    971         for o in objects:
    972             dict[o] = o.arg
    973         return dict, objects
    974 
    975     def make_weak_valued_dict(self):
    976         dict = weakref.WeakValueDictionary()
    977         objects = map(Object, range(self.COUNT))
    978         for o in objects:
    979             dict[o.arg] = o
    980         return dict, objects
    981 
    982     def check_popitem(self, klass, key1, value1, key2, value2):
    983         weakdict = klass()
    984         weakdict[key1] = value1
    985         weakdict[key2] = value2
    986         self.assertTrue(len(weakdict) == 2)
    987         k, v = weakdict.popitem()
    988         self.assertTrue(len(weakdict) == 1)
    989         if k is key1:
    990             self.assertTrue(v is value1)
    991         else:
    992             self.assertTrue(v is value2)
    993         k, v = weakdict.popitem()
    994         self.assertTrue(len(weakdict) == 0)
    995         if k is key1:
    996             self.assertTrue(v is value1)
    997         else:
    998             self.assertTrue(v is value2)
    999 
   1000     def test_weak_valued_dict_popitem(self):
   1001         self.check_popitem(weakref.WeakValueDictionary,
   1002                            "key1", C(), "key2", C())
   1003 
   1004     def test_weak_keyed_dict_popitem(self):
   1005         self.check_popitem(weakref.WeakKeyDictionary,
   1006                            C(), "value 1", C(), "value 2")
   1007 
   1008     def check_setdefault(self, klass, key, value1, value2):
   1009         self.assertTrue(value1 is not value2,
   1010                      "invalid test"
   1011                      " -- value parameters must be distinct objects")
   1012         weakdict = klass()
   1013         o = weakdict.setdefault(key, value1)
   1014         self.assertIs(o, value1)
   1015         self.assertIn(key, weakdict)
   1016         self.assertIs(weakdict.get(key), value1)
   1017         self.assertIs(weakdict[key], value1)
   1018 
   1019         o = weakdict.setdefault(key, value2)
   1020         self.assertIs(o, value1)
   1021         self.assertIn(key, weakdict)
   1022         self.assertIs(weakdict.get(key), value1)
   1023         self.assertIs(weakdict[key], value1)
   1024 
   1025     def test_weak_valued_dict_setdefault(self):
   1026         self.check_setdefault(weakref.WeakValueDictionary,
   1027                               "key", C(), C())
   1028 
   1029     def test_weak_keyed_dict_setdefault(self):
   1030         self.check_setdefault(weakref.WeakKeyDictionary,
   1031                               C(), "value 1", "value 2")
   1032 
   1033     def check_update(self, klass, dict):
   1034         #

   1035         #  This exercises d.update(), len(d), d.keys(), in d,

   1036         #  d.get(), d[].

   1037         #

   1038         weakdict = klass()
   1039         weakdict.update(dict)
   1040         self.assertEqual(len(weakdict), len(dict))
   1041         for k in weakdict.keys():
   1042             self.assertIn(k, dict,
   1043                          "mysterious new key appeared in weak dict")
   1044             v = dict.get(k)
   1045             self.assertIs(v, weakdict[k])
   1046             self.assertIs(v, weakdict.get(k))
   1047         for k in dict.keys():
   1048             self.assertIn(k, weakdict,
   1049                          "original key disappeared in weak dict")
   1050             v = dict[k]
   1051             self.assertIs(v, weakdict[k])
   1052             self.assertIs(v, weakdict.get(k))
   1053 
   1054     def test_weak_valued_dict_update(self):
   1055         self.check_update(weakref.WeakValueDictionary,
   1056                           {1: C(), 'a': C(), C(): C()})
   1057 
   1058     def test_weak_keyed_dict_update(self):
   1059         self.check_update(weakref.WeakKeyDictionary,
   1060                           {C(): 1, C(): 2, C(): 3})
   1061 
   1062     def test_weak_keyed_delitem(self):
   1063         d = weakref.WeakKeyDictionary()
   1064         o1 = Object('1')
   1065         o2 = Object('2')
   1066         d[o1] = 'something'
   1067         d[o2] = 'something'
   1068         self.assertTrue(len(d) == 2)
   1069         del d[o1]
   1070         self.assertTrue(len(d) == 1)
   1071         self.assertTrue(d.keys() == [o2])
   1072 
   1073     def test_weak_valued_delitem(self):
   1074         d = weakref.WeakValueDictionary()
   1075         o1 = Object('1')
   1076         o2 = Object('2')
   1077         d['something'] = o1
   1078         d['something else'] = o2
   1079         self.assertTrue(len(d) == 2)
   1080         del d['something']
   1081         self.assertTrue(len(d) == 1)
   1082         self.assertTrue(d.items() == [('something else', o2)])
   1083 
   1084     def test_weak_keyed_bad_delitem(self):
   1085         d = weakref.WeakKeyDictionary()
   1086         o = Object('1')
   1087         # An attempt to delete an object that isn't there should raise

   1088         # KeyError.  It didn't before 2.3.

   1089         self.assertRaises(KeyError, d.__delitem__, o)
   1090         self.assertRaises(KeyError, d.__getitem__, o)
   1091 
   1092         # If a key isn't of a weakly referencable type, __getitem__ and

   1093         # __setitem__ raise TypeError.  __delitem__ should too.

   1094         self.assertRaises(TypeError, d.__delitem__,  13)
   1095         self.assertRaises(TypeError, d.__getitem__,  13)
   1096         self.assertRaises(TypeError, d.__setitem__,  13, 13)
   1097 
   1098     def test_weak_keyed_cascading_deletes(self):
   1099         # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated

   1100         # over the keys via self.data.iterkeys().  If things vanished from

   1101         # the dict during this (or got added), that caused a RuntimeError.

   1102 
   1103         d = weakref.WeakKeyDictionary()
   1104         mutate = False
   1105 
   1106         class C(object):
   1107             def __init__(self, i):
   1108                 self.value = i
   1109             def __hash__(self):
   1110                 return hash(self.value)
   1111             def __eq__(self, other):
   1112                 if mutate:
   1113                     # Side effect that mutates the dict, by removing the

   1114                     # last strong reference to a key.

   1115                     del objs[-1]
   1116                 return self.value == other.value
   1117 
   1118         objs = [C(i) for i in range(4)]
   1119         for o in objs:
   1120             d[o] = o.value
   1121         del o   # now the only strong references to keys are in objs

   1122         # Find the order in which iterkeys sees the keys.

   1123         objs = d.keys()
   1124         # Reverse it, so that the iteration implementation of __delitem__

   1125         # has to keep looping to find the first object we delete.

   1126         objs.reverse()
   1127 
   1128         # Turn on mutation in C.__eq__.  The first time thru the loop,

   1129         # under the iterkeys() business the first comparison will delete

   1130         # the last item iterkeys() would see, and that causes a

   1131         #     RuntimeError: dictionary changed size during iteration

   1132         # when the iterkeys() loop goes around to try comparing the next

   1133         # key.  After this was fixed, it just deletes the last object *our*

   1134         # "for o in obj" loop would have gotten to.

   1135         mutate = True
   1136         count = 0
   1137         for o in objs:
   1138             count += 1
   1139             del d[o]
   1140         self.assertEqual(len(d), 0)
   1141         self.assertEqual(count, 2)
   1142 
   1143 from test import mapping_tests
   1144 
   1145 class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
   1146     """Check that WeakValueDictionary conforms to the mapping protocol"""
   1147     __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
   1148     type2test = weakref.WeakValueDictionary
   1149     def _reference(self):
   1150         return self.__ref.copy()
   1151 
   1152 class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
   1153     """Check that WeakKeyDictionary conforms to the mapping protocol"""
   1154     __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
   1155     type2test = weakref.WeakKeyDictionary
   1156     def _reference(self):
   1157         return self.__ref.copy()
   1158 
   1159 libreftest = """ Doctest for examples in the library reference: weakref.rst
   1160 
   1161 >>> import weakref
   1162 >>> class Dict(dict):
   1163 ...     pass
   1164 ...
   1165 >>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
   1166 >>> r = weakref.ref(obj)
   1167 >>> print r() is obj
   1168 True
   1169 
   1170 >>> import weakref
   1171 >>> class Object:
   1172 ...     pass
   1173 ...
   1174 >>> o = Object()
   1175 >>> r = weakref.ref(o)
   1176 >>> o2 = r()
   1177 >>> o is o2
   1178 True
   1179 >>> del o, o2
   1180 >>> print r()
   1181 None
   1182 
   1183 >>> import weakref
   1184 >>> class ExtendedRef(weakref.ref):
   1185 ...     def __init__(self, ob, callback=None, **annotations):
   1186 ...         super(ExtendedRef, self).__init__(ob, callback)
   1187 ...         self.__counter = 0
   1188 ...         for k, v in annotations.iteritems():
   1189 ...             setattr(self, k, v)
   1190 ...     def __call__(self):
   1191 ...         '''Return a pair containing the referent and the number of
   1192 ...         times the reference has been called.
   1193 ...         '''
   1194 ...         ob = super(ExtendedRef, self).__call__()
   1195 ...         if ob is not None:
   1196 ...             self.__counter += 1
   1197 ...             ob = (ob, self.__counter)
   1198 ...         return ob
   1199 ...
   1200 >>> class A:   # not in docs from here, just testing the ExtendedRef
   1201 ...     pass
   1202 ...
   1203 >>> a = A()
   1204 >>> r = ExtendedRef(a, foo=1, bar="baz")
   1205 >>> r.foo
   1206 1
   1207 >>> r.bar
   1208 'baz'
   1209 >>> r()[1]
   1210 1
   1211 >>> r()[1]
   1212 2
   1213 >>> r()[0] is a
   1214 True
   1215 
   1216 
   1217 >>> import weakref
   1218 >>> _id2obj_dict = weakref.WeakValueDictionary()
   1219 >>> def remember(obj):
   1220 ...     oid = id(obj)
   1221 ...     _id2obj_dict[oid] = obj
   1222 ...     return oid
   1223 ...
   1224 >>> def id2obj(oid):
   1225 ...     return _id2obj_dict[oid]
   1226 ...
   1227 >>> a = A()             # from here, just testing
   1228 >>> a_id = remember(a)
   1229 >>> id2obj(a_id) is a
   1230 True
   1231 >>> del a
   1232 >>> try:
   1233 ...     id2obj(a_id)
   1234 ... except KeyError:
   1235 ...     print 'OK'
   1236 ... else:
   1237 ...     print 'WeakValueDictionary error'
   1238 OK
   1239 
   1240 """
   1241 
   1242 __test__ = {'libreftest' : libreftest}
   1243 
   1244 def test_main():
   1245     test_support.run_unittest(
   1246         ReferencesTestCase,
   1247         MappingTestCase,
   1248         WeakValueDictionaryTestCase,
   1249         WeakKeyDictionaryTestCase,
   1250         SubclassableWeakrefTestCase,
   1251         )
   1252     test_support.run_doctest(sys.modules[__name__])
   1253 
   1254 
   1255 if __name__ == "__main__":
   1256     test_main()
   1257