Home | History | Annotate | Download | only in multiprocessing
      1 #
      2 # Module which supports allocation of ctypes objects from shared memory
      3 #
      4 # multiprocessing/sharedctypes.py
      5 #
      6 # Copyright (c) 2006-2008, R Oudkerk
      7 # All rights reserved.
      8 #
      9 # Redistribution and use in source and binary forms, with or without
     10 # modification, are permitted provided that the following conditions
     11 # are met:
     12 #
     13 # 1. Redistributions of source code must retain the above copyright
     14 #    notice, this list of conditions and the following disclaimer.
     15 # 2. Redistributions in binary form must reproduce the above copyright
     16 #    notice, this list of conditions and the following disclaimer in the
     17 #    documentation and/or other materials provided with the distribution.
     18 # 3. Neither the name of author nor the names of any contributors may be
     19 #    used to endorse or promote products derived from this software
     20 #    without specific prior written permission.
     21 #
     22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
     23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     25 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     32 # SUCH DAMAGE.
     33 #
     34 
     35 import sys
     36 import ctypes
     37 import weakref
     38 
     39 from multiprocessing import heap, RLock
     40 from multiprocessing.forking import assert_spawning, ForkingPickler
     41 
     42 __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
     43 
     44 #
     45 #
     46 #
     47 
     48 typecode_to_type = {
     49     'c': ctypes.c_char,  'u': ctypes.c_wchar,
     50     'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
     51     'h': ctypes.c_short, 'H': ctypes.c_ushort,
     52     'i': ctypes.c_int,   'I': ctypes.c_uint,
     53     'l': ctypes.c_long,  'L': ctypes.c_ulong,
     54     'f': ctypes.c_float, 'd': ctypes.c_double
     55     }
     56 
     57 #
     58 #
     59 #
     60 
     61 def _new_value(type_):
     62     size = ctypes.sizeof(type_)
     63     wrapper = heap.BufferWrapper(size)
     64     return rebuild_ctype(type_, wrapper, None)
     65 
     66 def RawValue(typecode_or_type, *args):
     67     '''
     68     Returns a ctypes object allocated from shared memory
     69     '''
     70     type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
     71     obj = _new_value(type_)
     72     ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
     73     obj.__init__(*args)
     74     return obj
     75 
     76 def RawArray(typecode_or_type, size_or_initializer):
     77     '''
     78     Returns a ctypes array allocated from shared memory
     79     '''
     80     type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
     81     if isinstance(size_or_initializer, (int, long)):
     82         type_ = type_ * size_or_initializer
     83         obj = _new_value(type_)
     84         ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
     85         return obj
     86     else:
     87         type_ = type_ * len(size_or_initializer)
     88         result = _new_value(type_)
     89         result.__init__(*size_or_initializer)
     90         return result
     91 
     92 def Value(typecode_or_type, *args, **kwds):
     93     '''
     94     Return a synchronization wrapper for a Value
     95     '''
     96     lock = kwds.pop('lock', None)
     97     if kwds:
     98         raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
     99     obj = RawValue(typecode_or_type, *args)
    100     if lock is False:
    101         return obj
    102     if lock in (True, None):
    103         lock = RLock()
    104     if not hasattr(lock, 'acquire'):
    105         raise AttributeError("'%r' has no method 'acquire'" % lock)
    106     return synchronized(obj, lock)
    107 
    108 def Array(typecode_or_type, size_or_initializer, **kwds):
    109     '''
    110     Return a synchronization wrapper for a RawArray
    111     '''
    112     lock = kwds.pop('lock', None)
    113     if kwds:
    114         raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
    115     obj = RawArray(typecode_or_type, size_or_initializer)
    116     if lock is False:
    117         return obj
    118     if lock in (True, None):
    119         lock = RLock()
    120     if not hasattr(lock, 'acquire'):
    121         raise AttributeError("'%r' has no method 'acquire'" % lock)
    122     return synchronized(obj, lock)
    123 
    124 def copy(obj):
    125     new_obj = _new_value(type(obj))
    126     ctypes.pointer(new_obj)[0] = obj
    127     return new_obj
    128 
    129 def synchronized(obj, lock=None):
    130     assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
    131 
    132     if isinstance(obj, ctypes._SimpleCData):
    133         return Synchronized(obj, lock)
    134     elif isinstance(obj, ctypes.Array):
    135         if obj._type_ is ctypes.c_char:
    136             return SynchronizedString(obj, lock)
    137         return SynchronizedArray(obj, lock)
    138     else:
    139         cls = type(obj)
    140         try:
    141             scls = class_cache[cls]
    142         except KeyError:
    143             names = [field[0] for field in cls._fields_]
    144             d = dict((name, make_property(name)) for name in names)
    145             classname = 'Synchronized' + cls.__name__
    146             scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
    147         return scls(obj, lock)
    148 
    149 #
    150 # Functions for pickling/unpickling
    151 #
    152 
    153 def reduce_ctype(obj):
    154     assert_spawning(obj)
    155     if isinstance(obj, ctypes.Array):
    156         return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
    157     else:
    158         return rebuild_ctype, (type(obj), obj._wrapper, None)
    159 
    160 def rebuild_ctype(type_, wrapper, length):
    161     if length is not None:
    162         type_ = type_ * length
    163     ForkingPickler.register(type_, reduce_ctype)
    164     obj = type_.from_address(wrapper.get_address())
    165     obj._wrapper = wrapper
    166     return obj
    167 
    168 #
    169 # Function to create properties
    170 #
    171 
    172 def make_property(name):
    173     try:
    174         return prop_cache[name]
    175     except KeyError:
    176         d = {}
    177         exec template % ((name,)*7) in d
    178         prop_cache[name] = d[name]
    179         return d[name]
    180 
    181 template = '''
    182 def get%s(self):
    183     self.acquire()
    184     try:
    185         return self._obj.%s
    186     finally:
    187         self.release()
    188 def set%s(self, value):
    189     self.acquire()
    190     try:
    191         self._obj.%s = value
    192     finally:
    193         self.release()
    194 %s = property(get%s, set%s)
    195 '''
    196 
    197 prop_cache = {}
    198 class_cache = weakref.WeakKeyDictionary()
    199 
    200 #
    201 # Synchronized wrappers
    202 #
    203 
    204 class SynchronizedBase(object):
    205 
    206     def __init__(self, obj, lock=None):
    207         self._obj = obj
    208         self._lock = lock or RLock()
    209         self.acquire = self._lock.acquire
    210         self.release = self._lock.release
    211 
    212     def __reduce__(self):
    213         assert_spawning(self)
    214         return synchronized, (self._obj, self._lock)
    215 
    216     def get_obj(self):
    217         return self._obj
    218 
    219     def get_lock(self):
    220         return self._lock
    221 
    222     def __repr__(self):
    223         return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
    224 
    225 
    226 class Synchronized(SynchronizedBase):
    227     value = make_property('value')
    228 
    229 
    230 class SynchronizedArray(SynchronizedBase):
    231 
    232     def __len__(self):
    233         return len(self._obj)
    234 
    235     def __getitem__(self, i):
    236         self.acquire()
    237         try:
    238             return self._obj[i]
    239         finally:
    240             self.release()
    241 
    242     def __setitem__(self, i, value):
    243         self.acquire()
    244         try:
    245             self._obj[i] = value
    246         finally:
    247             self.release()
    248 
    249     def __getslice__(self, start, stop):
    250         self.acquire()
    251         try:
    252             return self._obj[start:stop]
    253         finally:
    254             self.release()
    255 
    256     def __setslice__(self, start, stop, values):
    257         self.acquire()
    258         try:
    259             self._obj[start:stop] = values
    260         finally:
    261             self.release()
    262 
    263 
    264 class SynchronizedString(SynchronizedArray):
    265     value = make_property('value')
    266     raw = make_property('raw')
    267