1 # 2 # Module which supports allocation of memory from an mmap 3 # 4 # multiprocessing/heap.py 5 # 6 # Copyright (c) 2006-2008, R Oudkerk 7 # All rights reserved. 8 # 9 # Redistribution and use in source and binary forms, with or without 10 # modification, are permitted provided that the following conditions 11 # are met: 12 # 13 # 1. Redistributions of source code must retain the above copyright 14 # notice, this list of conditions and the following disclaimer. 15 # 2. Redistributions in binary form must reproduce the above copyright 16 # notice, this list of conditions and the following disclaimer in the 17 # documentation and/or other materials provided with the distribution. 18 # 3. Neither the name of author nor the names of any contributors may be 19 # used to endorse or promote products derived from this software 20 # without specific prior written permission. 21 # 22 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 # SUCH DAMAGE. 33 # 34 35 import bisect 36 import mmap 37 import tempfile 38 import os 39 import sys 40 import threading 41 import itertools 42 43 import _multiprocessing 44 from multiprocessing.util import Finalize, info 45 from multiprocessing.forking import assert_spawning 46 47 __all__ = ['BufferWrapper'] 48 49 # 50 # Inheirtable class which wraps an mmap, and from which blocks can be allocated 51 # 52 53 if sys.platform == 'win32': 54 55 from _multiprocessing import win32 56 57 class Arena(object): 58 59 _counter = itertools.count() 60 61 def __init__(self, size): 62 self.size = size 63 self.name = 'pym-%d-%d' % (os.getpid(), Arena._counter.next()) 64 self.buffer = mmap.mmap(-1, self.size, tagname=self.name) 65 assert win32.GetLastError() == 0, 'tagname already in use' 66 self._state = (self.size, self.name) 67 68 def __getstate__(self): 69 assert_spawning(self) 70 return self._state 71 72 def __setstate__(self, state): 73 self.size, self.name = self._state = state 74 self.buffer = mmap.mmap(-1, self.size, tagname=self.name) 75 assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS 76 77 else: 78 79 class Arena(object): 80 81 def __init__(self, size): 82 self.buffer = mmap.mmap(-1, size) 83 self.size = size 84 self.name = None 85 86 # 87 # Class allowing allocation of chunks of memory from arenas 88 # 89 90 class Heap(object): 91 92 _alignment = 8 93 94 def __init__(self, size=mmap.PAGESIZE): 95 self._lastpid = os.getpid() 96 self._lock = threading.Lock() 97 self._size = size 98 self._lengths = [] 99 self._len_to_seq = {} 100 self._start_to_block = {} 101 self._stop_to_block = {} 102 self._allocated_blocks = set() 103 self._arenas = [] 104 # list of pending blocks to free - see free() comment below 105 self._pending_free_blocks = [] 106 107 @staticmethod 108 def _roundup(n, alignment): 109 # alignment must be a power of 2 110 mask = alignment - 1 111 return (n + mask) & ~mask 112 113 def _malloc(self, size): 114 # returns a large enough block -- it might be much larger 115 i = bisect.bisect_left(self._lengths, size) 116 if i == len(self._lengths): 117 length = self._roundup(max(self._size, size), mmap.PAGESIZE) 118 self._size *= 2 119 info('allocating a new mmap of length %d', length) 120 arena = Arena(length) 121 self._arenas.append(arena) 122 return (arena, 0, length) 123 else: 124 length = self._lengths[i] 125 seq = self._len_to_seq[length] 126 block = seq.pop() 127 if not seq: 128 del self._len_to_seq[length], self._lengths[i] 129 130 (arena, start, stop) = block 131 del self._start_to_block[(arena, start)] 132 del self._stop_to_block[(arena, stop)] 133 return block 134 135 def _free(self, block): 136 # free location and try to merge with neighbours 137 (arena, start, stop) = block 138 139 try: 140 prev_block = self._stop_to_block[(arena, start)] 141 except KeyError: 142 pass 143 else: 144 start, _ = self._absorb(prev_block) 145 146 try: 147 next_block = self._start_to_block[(arena, stop)] 148 except KeyError: 149 pass 150 else: 151 _, stop = self._absorb(next_block) 152 153 block = (arena, start, stop) 154 length = stop - start 155 156 try: 157 self._len_to_seq[length].append(block) 158 except KeyError: 159 self._len_to_seq[length] = [block] 160 bisect.insort(self._lengths, length) 161 162 self._start_to_block[(arena, start)] = block 163 self._stop_to_block[(arena, stop)] = block 164 165 def _absorb(self, block): 166 # deregister this block so it can be merged with a neighbour 167 (arena, start, stop) = block 168 del self._start_to_block[(arena, start)] 169 del self._stop_to_block[(arena, stop)] 170 171 length = stop - start 172 seq = self._len_to_seq[length] 173 seq.remove(block) 174 if not seq: 175 del self._len_to_seq[length] 176 self._lengths.remove(length) 177 178 return start, stop 179 180 def _free_pending_blocks(self): 181 # Free all the blocks in the pending list - called with the lock held. 182 while True: 183 try: 184 block = self._pending_free_blocks.pop() 185 except IndexError: 186 break 187 self._allocated_blocks.remove(block) 188 self._free(block) 189 190 def free(self, block): 191 # free a block returned by malloc() 192 # Since free() can be called asynchronously by the GC, it could happen 193 # that it's called while self._lock is held: in that case, 194 # self._lock.acquire() would deadlock (issue #12352). To avoid that, a 195 # trylock is used instead, and if the lock can't be acquired 196 # immediately, the block is added to a list of blocks to be freed 197 # synchronously sometimes later from malloc() or free(), by calling 198 # _free_pending_blocks() (appending and retrieving from a list is not 199 # strictly thread-safe but under cPython it's atomic thanks to the GIL). 200 assert os.getpid() == self._lastpid 201 if not self._lock.acquire(False): 202 # can't acquire the lock right now, add the block to the list of 203 # pending blocks to free 204 self._pending_free_blocks.append(block) 205 else: 206 # we hold the lock 207 try: 208 self._free_pending_blocks() 209 self._allocated_blocks.remove(block) 210 self._free(block) 211 finally: 212 self._lock.release() 213 214 def malloc(self, size): 215 # return a block of right size (possibly rounded up) 216 assert 0 <= size < sys.maxint 217 if os.getpid() != self._lastpid: 218 self.__init__() # reinitialize after fork 219 self._lock.acquire() 220 self._free_pending_blocks() 221 try: 222 size = self._roundup(max(size,1), self._alignment) 223 (arena, start, stop) = self._malloc(size) 224 new_stop = start + size 225 if new_stop < stop: 226 self._free((arena, new_stop, stop)) 227 block = (arena, start, new_stop) 228 self._allocated_blocks.add(block) 229 return block 230 finally: 231 self._lock.release() 232 233 # 234 # Class representing a chunk of an mmap -- can be inherited 235 # 236 237 class BufferWrapper(object): 238 239 _heap = Heap() 240 241 def __init__(self, size): 242 assert 0 <= size < sys.maxint 243 block = BufferWrapper._heap.malloc(size) 244 self._state = (block, size) 245 Finalize(self, BufferWrapper._heap.free, args=(block,)) 246 247 def get_address(self): 248 (arena, start, stop), size = self._state 249 address, length = _multiprocessing.address_of_buffer(arena.buffer) 250 assert size <= length 251 return address + start 252 253 def get_size(self): 254 return self._state[1] 255