1 # Defines classes that provide synchronization objects. Note that use of 2 # this module requires that your Python support threads. 3 # 4 # condition(lock=None) # a POSIX-like condition-variable object 5 # barrier(n) # an n-thread barrier 6 # event() # an event object 7 # semaphore(n=1) # a semaphore object, with initial count n 8 # mrsw() # a multiple-reader single-writer lock 9 # 10 # CONDITIONS 11 # 12 # A condition object is created via 13 # import this_module 14 # your_condition_object = this_module.condition(lock=None) 15 # 16 # As explained below, a condition object has a lock associated with it, 17 # used in the protocol to protect condition data. You can specify a 18 # lock to use in the constructor, else the constructor will allocate 19 # an anonymous lock for you. Specifying a lock explicitly can be useful 20 # when more than one condition keys off the same set of shared data. 21 # 22 # Methods: 23 # .acquire() 24 # acquire the lock associated with the condition 25 # .release() 26 # release the lock associated with the condition 27 # .wait() 28 # block the thread until such time as some other thread does a 29 # .signal or .broadcast on the same condition, and release the 30 # lock associated with the condition. The lock associated with 31 # the condition MUST be in the acquired state at the time 32 # .wait is invoked. 33 # .signal() 34 # wake up exactly one thread (if any) that previously did a .wait 35 # on the condition; that thread will awaken with the lock associated 36 # with the condition in the acquired state. If no threads are 37 # .wait'ing, this is a nop. If more than one thread is .wait'ing on 38 # the condition, any of them may be awakened. 39 # .broadcast() 40 # wake up all threads (if any) that are .wait'ing on the condition; 41 # the threads are woken up serially, each with the lock in the 42 # acquired state, so should .release() as soon as possible. If no 43 # threads are .wait'ing, this is a nop. 44 # 45 # Note that if a thread does a .wait *while* a signal/broadcast is 46 # in progress, it's guaranteeed to block until a subsequent 47 # signal/broadcast. 48 # 49 # Secret feature: `broadcast' actually takes an integer argument, 50 # and will wake up exactly that many waiting threads (or the total 51 # number waiting, if that's less). Use of this is dubious, though, 52 # and probably won't be supported if this form of condition is 53 # reimplemented in C. 54 # 55 # DIFFERENCES FROM POSIX 56 # 57 # + A separate mutex is not needed to guard condition data. Instead, a 58 # condition object can (must) be .acquire'ed and .release'ed directly. 59 # This eliminates a common error in using POSIX conditions. 60 # 61 # + Because of implementation difficulties, a POSIX `signal' wakes up 62 # _at least_ one .wait'ing thread. Race conditions make it difficult 63 # to stop that. This implementation guarantees to wake up only one, 64 # but you probably shouldn't rely on that. 65 # 66 # PROTOCOL 67 # 68 # Condition objects are used to block threads until "some condition" is 69 # true. E.g., a thread may wish to wait until a producer pumps out data 70 # for it to consume, or a server may wish to wait until someone requests 71 # its services, or perhaps a whole bunch of threads want to wait until a 72 # preceding pass over the data is complete. Early models for conditions 73 # relied on some other thread figuring out when a blocked thread's 74 # condition was true, and made the other thread responsible both for 75 # waking up the blocked thread and guaranteeing that it woke up with all 76 # data in a correct state. This proved to be very delicate in practice, 77 # and gave conditions a bad name in some circles. 78 # 79 # The POSIX model addresses these problems by making a thread responsible 80 # for ensuring that its own state is correct when it wakes, and relies 81 # on a rigid protocol to make this easy; so long as you stick to the 82 # protocol, POSIX conditions are easy to "get right": 83 # 84 # A) The thread that's waiting for some arbitrarily-complex condition 85 # (ACC) to become true does: 86 # 87 # condition.acquire() 88 # while not (code to evaluate the ACC): 89 # condition.wait() 90 # # That blocks the thread, *and* releases the lock. When a 91 # # condition.signal() happens, it will wake up some thread that 92 # # did a .wait, *and* acquire the lock again before .wait 93 # # returns. 94 # # 95 # # Because the lock is acquired at this point, the state used 96 # # in evaluating the ACC is frozen, so it's safe to go back & 97 # # reevaluate the ACC. 98 # 99 # # At this point, ACC is true, and the thread has the condition 100 # # locked. 101 # # So code here can safely muck with the shared state that 102 # # went into evaluating the ACC -- if it wants to. 103 # # When done mucking with the shared state, do 104 # condition.release() 105 # 106 # B) Threads that are mucking with shared state that may affect the 107 # ACC do: 108 # 109 # condition.acquire() 110 # # muck with shared state 111 # condition.release() 112 # if it's possible that ACC is true now: 113 # condition.signal() # or .broadcast() 114 # 115 # Note: You may prefer to put the "if" clause before the release(). 116 # That's fine, but do note that anyone waiting on the signal will 117 # stay blocked until the release() is done (since acquiring the 118 # condition is part of what .wait() does before it returns). 119 # 120 # TRICK OF THE TRADE 121 # 122 # With simpler forms of conditions, it can be impossible to know when 123 # a thread that's supposed to do a .wait has actually done it. But 124 # because this form of condition releases a lock as _part_ of doing a 125 # wait, the state of that lock can be used to guarantee it. 126 # 127 # E.g., suppose thread A spawns thread B and later wants to wait for B to 128 # complete: 129 # 130 # In A: In B: 131 # 132 # B_done = condition() ... do work ... 133 # B_done.acquire() B_done.acquire(); B_done.release() 134 # spawn B B_done.signal() 135 # ... some time later ... ... and B exits ... 136 # B_done.wait() 137 # 138 # Because B_done was in the acquire'd state at the time B was spawned, 139 # B's attempt to acquire B_done can't succeed until A has done its 140 # B_done.wait() (which releases B_done). So B's B_done.signal() is 141 # guaranteed to be seen by the .wait(). Without the lock trick, B 142 # may signal before A .waits, and then A would wait forever. 143 # 144 # BARRIERS 145 # 146 # A barrier object is created via 147 # import this_module 148 # your_barrier = this_module.barrier(num_threads) 149 # 150 # Methods: 151 # .enter() 152 # the thread blocks until num_threads threads in all have done 153 # .enter(). Then the num_threads threads that .enter'ed resume, 154 # and the barrier resets to capture the next num_threads threads 155 # that .enter it. 156 # 157 # EVENTS 158 # 159 # An event object is created via 160 # import this_module 161 # your_event = this_module.event() 162 # 163 # An event has two states, `posted' and `cleared'. An event is 164 # created in the cleared state. 165 # 166 # Methods: 167 # 168 # .post() 169 # Put the event in the posted state, and resume all threads 170 # .wait'ing on the event (if any). 171 # 172 # .clear() 173 # Put the event in the cleared state. 174 # 175 # .is_posted() 176 # Returns 0 if the event is in the cleared state, or 1 if the event 177 # is in the posted state. 178 # 179 # .wait() 180 # If the event is in the posted state, returns immediately. 181 # If the event is in the cleared state, blocks the calling thread 182 # until the event is .post'ed by another thread. 183 # 184 # Note that an event, once posted, remains posted until explicitly 185 # cleared. Relative to conditions, this is both the strength & weakness 186 # of events. It's a strength because the .post'ing thread doesn't have to 187 # worry about whether the threads it's trying to communicate with have 188 # already done a .wait (a condition .signal is seen only by threads that 189 # do a .wait _prior_ to the .signal; a .signal does not persist). But 190 # it's a weakness because .clear'ing an event is error-prone: it's easy 191 # to mistakenly .clear an event before all the threads you intended to 192 # see the event get around to .wait'ing on it. But so long as you don't 193 # need to .clear an event, events are easy to use safely. 194 # 195 # SEMAPHORES 196 # 197 # A semaphore object is created via 198 # import this_module 199 # your_semaphore = this_module.semaphore(count=1) 200 # 201 # A semaphore has an integer count associated with it. The initial value 202 # of the count is specified by the optional argument (which defaults to 203 # 1) passed to the semaphore constructor. 204 # 205 # Methods: 206 # 207 # .p() 208 # If the semaphore's count is greater than 0, decrements the count 209 # by 1 and returns. 210 # Else if the semaphore's count is 0, blocks the calling thread 211 # until a subsequent .v() increases the count. When that happens, 212 # the count will be decremented by 1 and the calling thread resumed. 213 # 214 # .v() 215 # Increments the semaphore's count by 1, and wakes up a thread (if 216 # any) blocked by a .p(). It's an (detected) error for a .v() to 217 # increase the semaphore's count to a value larger than the initial 218 # count. 219 # 220 # MULTIPLE-READER SINGLE-WRITER LOCKS 221 # 222 # A mrsw lock is created via 223 # import this_module 224 # your_mrsw_lock = this_module.mrsw() 225 # 226 # This kind of lock is often useful with complex shared data structures. 227 # The object lets any number of "readers" proceed, so long as no thread 228 # wishes to "write". When a (one or more) thread declares its intention 229 # to "write" (e.g., to update a shared structure), all current readers 230 # are allowed to finish, and then a writer gets exclusive access; all 231 # other readers & writers are blocked until the current writer completes. 232 # Finally, if some thread is waiting to write and another is waiting to 233 # read, the writer takes precedence. 234 # 235 # Methods: 236 # 237 # .read_in() 238 # If no thread is writing or waiting to write, returns immediately. 239 # Else blocks until no thread is writing or waiting to write. So 240 # long as some thread has completed a .read_in but not a .read_out, 241 # writers are blocked. 242 # 243 # .read_out() 244 # Use sometime after a .read_in to declare that the thread is done 245 # reading. When all threads complete reading, a writer can proceed. 246 # 247 # .write_in() 248 # If no thread is writing (has completed a .write_in, but hasn't yet 249 # done a .write_out) or reading (similarly), returns immediately. 250 # Else blocks the calling thread, and threads waiting to read, until 251 # the current writer completes writing or all the current readers 252 # complete reading; if then more than one thread is waiting to 253 # write, one of them is allowed to proceed, but which one is not 254 # specified. 255 # 256 # .write_out() 257 # Use sometime after a .write_in to declare that the thread is done 258 # writing. Then if some other thread is waiting to write, it's 259 # allowed to proceed. Else all threads (if any) waiting to read are 260 # allowed to proceed. 261 # 262 # .write_to_read() 263 # Use instead of a .write_in to declare that the thread is done 264 # writing but wants to continue reading without other writers 265 # intervening. If there are other threads waiting to write, they 266 # are allowed to proceed only if the current thread calls 267 # .read_out; threads waiting to read are only allowed to proceed 268 # if there are no threads waiting to write. (This is a 269 # weakness of the interface!) 270 271 import thread 272 273 class condition: 274 def __init__(self, lock=None): 275 # the lock actually used by .acquire() and .release() 276 if lock is None: 277 self.mutex = thread.allocate_lock() 278 else: 279 if hasattr(lock, 'acquire') and \ 280 hasattr(lock, 'release'): 281 self.mutex = lock 282 else: 283 raise TypeError, 'condition constructor requires ' \ 284 'a lock argument' 285 286 # lock used to block threads until a signal 287 self.checkout = thread.allocate_lock() 288 self.checkout.acquire() 289 290 # internal critical-section lock, & the data it protects 291 self.idlock = thread.allocate_lock() 292 self.id = 0 293 self.waiting = 0 # num waiters subject to current release 294 self.pending = 0 # num waiters awaiting next signal 295 self.torelease = 0 # num waiters to release 296 self.releasing = 0 # 1 iff release is in progress 297 298 def acquire(self): 299 self.mutex.acquire() 300 301 def release(self): 302 self.mutex.release() 303 304 def wait(self): 305 mutex, checkout, idlock = self.mutex, self.checkout, self.idlock 306 if not mutex.locked(): 307 raise ValueError, \ 308 "condition must be .acquire'd when .wait() invoked" 309 310 idlock.acquire() 311 myid = self.id 312 self.pending = self.pending + 1 313 idlock.release() 314 315 mutex.release() 316 317 while 1: 318 checkout.acquire(); idlock.acquire() 319 if myid < self.id: 320 break 321 checkout.release(); idlock.release() 322 323 self.waiting = self.waiting - 1 324 self.torelease = self.torelease - 1 325 if self.torelease: 326 checkout.release() 327 else: 328 self.releasing = 0 329 if self.waiting == self.pending == 0: 330 self.id = 0 331 idlock.release() 332 mutex.acquire() 333 334 def signal(self): 335 self.broadcast(1) 336 337 def broadcast(self, num = -1): 338 if num < -1: 339 raise ValueError, '.broadcast called with num %r' % (num,) 340 if num == 0: 341 return 342 self.idlock.acquire() 343 if self.pending: 344 self.waiting = self.waiting + self.pending 345 self.pending = 0 346 self.id = self.id + 1 347 if num == -1: 348 self.torelease = self.waiting 349 else: 350 self.torelease = min( self.waiting, 351 self.torelease + num ) 352 if self.torelease and not self.releasing: 353 self.releasing = 1 354 self.checkout.release() 355 self.idlock.release() 356 357 class barrier: 358 def __init__(self, n): 359 self.n = n 360 self.togo = n 361 self.full = condition() 362 363 def enter(self): 364 full = self.full 365 full.acquire() 366 self.togo = self.togo - 1 367 if self.togo: 368 full.wait() 369 else: 370 self.togo = self.n 371 full.broadcast() 372 full.release() 373 374 class event: 375 def __init__(self): 376 self.state = 0 377 self.posted = condition() 378 379 def post(self): 380 self.posted.acquire() 381 self.state = 1 382 self.posted.broadcast() 383 self.posted.release() 384 385 def clear(self): 386 self.posted.acquire() 387 self.state = 0 388 self.posted.release() 389 390 def is_posted(self): 391 self.posted.acquire() 392 answer = self.state 393 self.posted.release() 394 return answer 395 396 def wait(self): 397 self.posted.acquire() 398 if not self.state: 399 self.posted.wait() 400 self.posted.release() 401 402 class semaphore: 403 def __init__(self, count=1): 404 if count <= 0: 405 raise ValueError, 'semaphore count %d; must be >= 1' % count 406 self.count = count 407 self.maxcount = count 408 self.nonzero = condition() 409 410 def p(self): 411 self.nonzero.acquire() 412 while self.count == 0: 413 self.nonzero.wait() 414 self.count = self.count - 1 415 self.nonzero.release() 416 417 def v(self): 418 self.nonzero.acquire() 419 if self.count == self.maxcount: 420 raise ValueError, '.v() tried to raise semaphore count above ' \ 421 'initial value %r' % self.maxcount 422 self.count = self.count + 1 423 self.nonzero.signal() 424 self.nonzero.release() 425 426 class mrsw: 427 def __init__(self): 428 # critical-section lock & the data it protects 429 self.rwOK = thread.allocate_lock() 430 self.nr = 0 # number readers actively reading (not just waiting) 431 self.nw = 0 # number writers either waiting to write or writing 432 self.writing = 0 # 1 iff some thread is writing 433 434 # conditions 435 self.readOK = condition(self.rwOK) # OK to unblock readers 436 self.writeOK = condition(self.rwOK) # OK to unblock writers 437 438 def read_in(self): 439 self.rwOK.acquire() 440 while self.nw: 441 self.readOK.wait() 442 self.nr = self.nr + 1 443 self.rwOK.release() 444 445 def read_out(self): 446 self.rwOK.acquire() 447 if self.nr <= 0: 448 raise ValueError, \ 449 '.read_out() invoked without an active reader' 450 self.nr = self.nr - 1 451 if self.nr == 0: 452 self.writeOK.signal() 453 self.rwOK.release() 454 455 def write_in(self): 456 self.rwOK.acquire() 457 self.nw = self.nw + 1 458 while self.writing or self.nr: 459 self.writeOK.wait() 460 self.writing = 1 461 self.rwOK.release() 462 463 def write_out(self): 464 self.rwOK.acquire() 465 if not self.writing: 466 raise ValueError, \ 467 '.write_out() invoked without an active writer' 468 self.writing = 0 469 self.nw = self.nw - 1 470 if self.nw: 471 self.writeOK.signal() 472 else: 473 self.readOK.broadcast() 474 self.rwOK.release() 475 476 def write_to_read(self): 477 self.rwOK.acquire() 478 if not self.writing: 479 raise ValueError, \ 480 '.write_to_read() invoked without an active writer' 481 self.writing = 0 482 self.nw = self.nw - 1 483 self.nr = self.nr + 1 484 if not self.nw: 485 self.readOK.broadcast() 486 self.rwOK.release() 487 488 # The rest of the file is a test case, that runs a number of parallelized 489 # quicksorts in parallel. If it works, you'll get about 600 lines of 490 # tracing output, with a line like 491 # test passed! 209 threads created in all 492 # as the last line. The content and order of preceding lines will 493 # vary across runs. 494 495 def _new_thread(func, *args): 496 global TID 497 tid.acquire(); id = TID = TID+1; tid.release() 498 io.acquire(); alive.append(id); \ 499 print 'starting thread', id, '--', len(alive), 'alive'; \ 500 io.release() 501 thread.start_new_thread( func, (id,) + args ) 502 503 def _qsort(tid, a, l, r, finished): 504 # sort a[l:r]; post finished when done 505 io.acquire(); print 'thread', tid, 'qsort', l, r; io.release() 506 if r-l > 1: 507 pivot = a[l] 508 j = l+1 # make a[l:j] <= pivot, and a[j:r] > pivot 509 for i in range(j, r): 510 if a[i] <= pivot: 511 a[j], a[i] = a[i], a[j] 512 j = j + 1 513 a[l], a[j-1] = a[j-1], pivot 514 515 l_subarray_sorted = event() 516 r_subarray_sorted = event() 517 _new_thread(_qsort, a, l, j-1, l_subarray_sorted) 518 _new_thread(_qsort, a, j, r, r_subarray_sorted) 519 l_subarray_sorted.wait() 520 r_subarray_sorted.wait() 521 522 io.acquire(); print 'thread', tid, 'qsort done'; \ 523 alive.remove(tid); io.release() 524 finished.post() 525 526 def _randarray(tid, a, finished): 527 io.acquire(); print 'thread', tid, 'randomizing array'; \ 528 io.release() 529 for i in range(1, len(a)): 530 wh.acquire(); j = randint(0,i); wh.release() 531 a[i], a[j] = a[j], a[i] 532 io.acquire(); print 'thread', tid, 'randomizing done'; \ 533 alive.remove(tid); io.release() 534 finished.post() 535 536 def _check_sort(a): 537 if a != range(len(a)): 538 raise ValueError, ('a not sorted', a) 539 540 def _run_one_sort(tid, a, bar, done): 541 # randomize a, and quicksort it 542 # for variety, all the threads running this enter a barrier 543 # at the end, and post `done' after the barrier exits 544 io.acquire(); print 'thread', tid, 'randomizing', a; \ 545 io.release() 546 finished = event() 547 _new_thread(_randarray, a, finished) 548 finished.wait() 549 550 io.acquire(); print 'thread', tid, 'sorting', a; io.release() 551 finished.clear() 552 _new_thread(_qsort, a, 0, len(a), finished) 553 finished.wait() 554 _check_sort(a) 555 556 io.acquire(); print 'thread', tid, 'entering barrier'; \ 557 io.release() 558 bar.enter() 559 io.acquire(); print 'thread', tid, 'leaving barrier'; \ 560 io.release() 561 io.acquire(); alive.remove(tid); io.release() 562 bar.enter() # make sure they've all removed themselves from alive 563 ## before 'done' is posted 564 bar.enter() # just to be cruel 565 done.post() 566 567 def test(): 568 global TID, tid, io, wh, randint, alive 569 import random 570 randint = random.randint 571 572 TID = 0 # thread ID (1, 2, ...) 573 tid = thread.allocate_lock() # for changing TID 574 io = thread.allocate_lock() # for printing, and 'alive' 575 wh = thread.allocate_lock() # for calls to random 576 alive = [] # IDs of active threads 577 578 NSORTS = 5 579 arrays = [] 580 for i in range(NSORTS): 581 arrays.append( range( (i+1)*10 ) ) 582 583 bar = barrier(NSORTS) 584 finished = event() 585 for i in range(NSORTS): 586 _new_thread(_run_one_sort, arrays[i], bar, finished) 587 finished.wait() 588 589 print 'all threads done, and checking results ...' 590 if alive: 591 raise ValueError, ('threads still alive at end', alive) 592 for i in range(NSORTS): 593 a = arrays[i] 594 if len(a) != (i+1)*10: 595 raise ValueError, ('length of array', i, 'screwed up') 596 _check_sort(a) 597 598 print 'test passed!', TID, 'threads created in all' 599 600 if __name__ == '__main__': 601 test() 602 603 # end of module 604