1 /* 2 * A type which wraps a semaphore 3 * 4 * semaphore.c 5 * 6 * Copyright (c) 2006-2008, R Oudkerk 7 * Licensed to PSF under a Contributor Agreement. 8 */ 9 10 #include "multiprocessing.h" 11 12 enum { RECURSIVE_MUTEX, SEMAPHORE }; 13 14 typedef struct { 15 PyObject_HEAD 16 SEM_HANDLE handle; 17 long last_tid; 18 int count; 19 int maxvalue; 20 int kind; 21 char *name; 22 } SemLockObject; 23 24 #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid) 25 26 27 #ifdef MS_WINDOWS 28 29 /* 30 * Windows definitions 31 */ 32 33 #define SEM_FAILED NULL 34 35 #define SEM_CLEAR_ERROR() SetLastError(0) 36 #define SEM_GET_LAST_ERROR() GetLastError() 37 #define SEM_CREATE(name, val, max) CreateSemaphore(NULL, val, max, NULL) 38 #define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1) 39 #define SEM_GETVALUE(sem, pval) _GetSemaphoreValue(sem, pval) 40 #define SEM_UNLINK(name) 0 41 42 static int 43 _GetSemaphoreValue(HANDLE handle, long *value) 44 { 45 long previous; 46 47 switch (WaitForSingleObjectEx(handle, 0, FALSE)) { 48 case WAIT_OBJECT_0: 49 if (!ReleaseSemaphore(handle, 1, &previous)) 50 return MP_STANDARD_ERROR; 51 *value = previous + 1; 52 return 0; 53 case WAIT_TIMEOUT: 54 *value = 0; 55 return 0; 56 default: 57 return MP_STANDARD_ERROR; 58 } 59 } 60 61 static PyObject * 62 semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds) 63 { 64 int blocking = 1; 65 double timeout; 66 PyObject *timeout_obj = Py_None; 67 DWORD res, full_msecs, nhandles; 68 HANDLE handles[2], sigint_event; 69 70 static char *kwlist[] = {"block", "timeout", NULL}; 71 72 if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist, 73 &blocking, &timeout_obj)) 74 return NULL; 75 76 /* calculate timeout */ 77 if (!blocking) { 78 full_msecs = 0; 79 } else if (timeout_obj == Py_None) { 80 full_msecs = INFINITE; 81 } else { 82 timeout = PyFloat_AsDouble(timeout_obj); 83 if (PyErr_Occurred()) 84 return NULL; 85 timeout *= 1000.0; /* convert to millisecs */ 86 if (timeout < 0.0) { 87 timeout = 0.0; 88 } else if (timeout >= 0.5 * INFINITE) { /* 25 days */ 89 PyErr_SetString(PyExc_OverflowError, 90 "timeout is too large"); 91 return NULL; 92 } 93 full_msecs = (DWORD)(timeout + 0.5); 94 } 95 96 /* check whether we already own the lock */ 97 if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) { 98 ++self->count; 99 Py_RETURN_TRUE; 100 } 101 102 /* check whether we can acquire without releasing the GIL and blocking */ 103 if (WaitForSingleObjectEx(self->handle, 0, FALSE) == WAIT_OBJECT_0) { 104 self->last_tid = GetCurrentThreadId(); 105 ++self->count; 106 Py_RETURN_TRUE; 107 } 108 109 /* prepare list of handles */ 110 nhandles = 0; 111 handles[nhandles++] = self->handle; 112 if (_PyOS_IsMainThread()) { 113 sigint_event = _PyOS_SigintEvent(); 114 assert(sigint_event != NULL); 115 handles[nhandles++] = sigint_event; 116 } 117 else { 118 sigint_event = NULL; 119 } 120 121 /* do the wait */ 122 Py_BEGIN_ALLOW_THREADS 123 if (sigint_event != NULL) 124 ResetEvent(sigint_event); 125 res = WaitForMultipleObjectsEx(nhandles, handles, FALSE, full_msecs, FALSE); 126 Py_END_ALLOW_THREADS 127 128 /* handle result */ 129 switch (res) { 130 case WAIT_TIMEOUT: 131 Py_RETURN_FALSE; 132 case WAIT_OBJECT_0 + 0: 133 self->last_tid = GetCurrentThreadId(); 134 ++self->count; 135 Py_RETURN_TRUE; 136 case WAIT_OBJECT_0 + 1: 137 errno = EINTR; 138 return PyErr_SetFromErrno(PyExc_IOError); 139 case WAIT_FAILED: 140 return PyErr_SetFromWindowsErr(0); 141 default: 142 PyErr_Format(PyExc_RuntimeError, "WaitForSingleObject() or " 143 "WaitForMultipleObjects() gave unrecognized " 144 "value %d", res); 145 return NULL; 146 } 147 } 148 149 static PyObject * 150 semlock_release(SemLockObject *self, PyObject *args) 151 { 152 if (self->kind == RECURSIVE_MUTEX) { 153 if (!ISMINE(self)) { 154 PyErr_SetString(PyExc_AssertionError, "attempt to " 155 "release recursive lock not owned " 156 "by thread"); 157 return NULL; 158 } 159 if (self->count > 1) { 160 --self->count; 161 Py_RETURN_NONE; 162 } 163 assert(self->count == 1); 164 } 165 166 if (!ReleaseSemaphore(self->handle, 1, NULL)) { 167 if (GetLastError() == ERROR_TOO_MANY_POSTS) { 168 PyErr_SetString(PyExc_ValueError, "semaphore or lock " 169 "released too many times"); 170 return NULL; 171 } else { 172 return PyErr_SetFromWindowsErr(0); 173 } 174 } 175 176 --self->count; 177 Py_RETURN_NONE; 178 } 179 180 #else /* !MS_WINDOWS */ 181 182 /* 183 * Unix definitions 184 */ 185 186 #define SEM_CLEAR_ERROR() 187 #define SEM_GET_LAST_ERROR() 0 188 #define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val) 189 #define SEM_CLOSE(sem) sem_close(sem) 190 #define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval) 191 #define SEM_UNLINK(name) sem_unlink(name) 192 193 /* OS X 10.4 defines SEM_FAILED as -1 instead of (sem_t *)-1; this gives 194 compiler warnings, and (potentially) undefined behaviour. */ 195 #ifdef __APPLE__ 196 # undef SEM_FAILED 197 # define SEM_FAILED ((sem_t *)-1) 198 #endif 199 200 #ifndef HAVE_SEM_UNLINK 201 # define sem_unlink(name) 0 202 #endif 203 204 #ifndef HAVE_SEM_TIMEDWAIT 205 # define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save) 206 207 static int 208 sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save) 209 { 210 int res; 211 unsigned long delay, difference; 212 struct timeval now, tvdeadline, tvdelay; 213 214 errno = 0; 215 tvdeadline.tv_sec = deadline->tv_sec; 216 tvdeadline.tv_usec = deadline->tv_nsec / 1000; 217 218 for (delay = 0 ; ; delay += 1000) { 219 /* poll */ 220 if (sem_trywait(sem) == 0) 221 return 0; 222 else if (errno != EAGAIN) 223 return MP_STANDARD_ERROR; 224 225 /* get current time */ 226 if (gettimeofday(&now, NULL) < 0) 227 return MP_STANDARD_ERROR; 228 229 /* check for timeout */ 230 if (tvdeadline.tv_sec < now.tv_sec || 231 (tvdeadline.tv_sec == now.tv_sec && 232 tvdeadline.tv_usec <= now.tv_usec)) { 233 errno = ETIMEDOUT; 234 return MP_STANDARD_ERROR; 235 } 236 237 /* calculate how much time is left */ 238 difference = (tvdeadline.tv_sec - now.tv_sec) * 1000000 + 239 (tvdeadline.tv_usec - now.tv_usec); 240 241 /* check delay not too long -- maximum is 20 msecs */ 242 if (delay > 20000) 243 delay = 20000; 244 if (delay > difference) 245 delay = difference; 246 247 /* sleep */ 248 tvdelay.tv_sec = delay / 1000000; 249 tvdelay.tv_usec = delay % 1000000; 250 if (select(0, NULL, NULL, NULL, &tvdelay) < 0) 251 return MP_STANDARD_ERROR; 252 253 /* check for signals */ 254 Py_BLOCK_THREADS 255 res = PyErr_CheckSignals(); 256 Py_UNBLOCK_THREADS 257 258 if (res) { 259 errno = EINTR; 260 return MP_EXCEPTION_HAS_BEEN_SET; 261 } 262 } 263 } 264 265 #endif /* !HAVE_SEM_TIMEDWAIT */ 266 267 static PyObject * 268 semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds) 269 { 270 int blocking = 1, res, err = 0; 271 double timeout; 272 PyObject *timeout_obj = Py_None; 273 struct timespec deadline = {0}; 274 struct timeval now; 275 long sec, nsec; 276 277 static char *kwlist[] = {"block", "timeout", NULL}; 278 279 if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist, 280 &blocking, &timeout_obj)) 281 return NULL; 282 283 if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) { 284 ++self->count; 285 Py_RETURN_TRUE; 286 } 287 288 if (timeout_obj != Py_None) { 289 timeout = PyFloat_AsDouble(timeout_obj); 290 if (PyErr_Occurred()) 291 return NULL; 292 if (timeout < 0.0) 293 timeout = 0.0; 294 295 if (gettimeofday(&now, NULL) < 0) { 296 PyErr_SetFromErrno(PyExc_OSError); 297 return NULL; 298 } 299 sec = (long) timeout; 300 nsec = (long) (1e9 * (timeout - sec) + 0.5); 301 deadline.tv_sec = now.tv_sec + sec; 302 deadline.tv_nsec = now.tv_usec * 1000 + nsec; 303 deadline.tv_sec += (deadline.tv_nsec / 1000000000); 304 deadline.tv_nsec %= 1000000000; 305 } 306 307 do { 308 Py_BEGIN_ALLOW_THREADS 309 if (blocking && timeout_obj == Py_None) 310 res = sem_wait(self->handle); 311 else if (!blocking) 312 res = sem_trywait(self->handle); 313 else 314 res = sem_timedwait(self->handle, &deadline); 315 Py_END_ALLOW_THREADS 316 err = errno; 317 if (res == MP_EXCEPTION_HAS_BEEN_SET) 318 break; 319 } while (res < 0 && errno == EINTR && !PyErr_CheckSignals()); 320 321 if (res < 0) { 322 errno = err; 323 if (errno == EAGAIN || errno == ETIMEDOUT) 324 Py_RETURN_FALSE; 325 else if (errno == EINTR) 326 return NULL; 327 else 328 return PyErr_SetFromErrno(PyExc_OSError); 329 } 330 331 ++self->count; 332 self->last_tid = PyThread_get_thread_ident(); 333 334 Py_RETURN_TRUE; 335 } 336 337 static PyObject * 338 semlock_release(SemLockObject *self, PyObject *args) 339 { 340 if (self->kind == RECURSIVE_MUTEX) { 341 if (!ISMINE(self)) { 342 PyErr_SetString(PyExc_AssertionError, "attempt to " 343 "release recursive lock not owned " 344 "by thread"); 345 return NULL; 346 } 347 if (self->count > 1) { 348 --self->count; 349 Py_RETURN_NONE; 350 } 351 assert(self->count == 1); 352 } else { 353 #ifdef HAVE_BROKEN_SEM_GETVALUE 354 /* We will only check properly the maxvalue == 1 case */ 355 if (self->maxvalue == 1) { 356 /* make sure that already locked */ 357 if (sem_trywait(self->handle) < 0) { 358 if (errno != EAGAIN) { 359 PyErr_SetFromErrno(PyExc_OSError); 360 return NULL; 361 } 362 /* it is already locked as expected */ 363 } else { 364 /* it was not locked so undo wait and raise */ 365 if (sem_post(self->handle) < 0) { 366 PyErr_SetFromErrno(PyExc_OSError); 367 return NULL; 368 } 369 PyErr_SetString(PyExc_ValueError, "semaphore " 370 "or lock released too many " 371 "times"); 372 return NULL; 373 } 374 } 375 #else 376 int sval; 377 378 /* This check is not an absolute guarantee that the semaphore 379 does not rise above maxvalue. */ 380 if (sem_getvalue(self->handle, &sval) < 0) { 381 return PyErr_SetFromErrno(PyExc_OSError); 382 } else if (sval >= self->maxvalue) { 383 PyErr_SetString(PyExc_ValueError, "semaphore or lock " 384 "released too many times"); 385 return NULL; 386 } 387 #endif 388 } 389 390 if (sem_post(self->handle) < 0) 391 return PyErr_SetFromErrno(PyExc_OSError); 392 393 --self->count; 394 Py_RETURN_NONE; 395 } 396 397 #endif /* !MS_WINDOWS */ 398 399 /* 400 * All platforms 401 */ 402 403 static PyObject * 404 newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue, 405 char *name) 406 { 407 SemLockObject *self; 408 409 self = PyObject_New(SemLockObject, type); 410 if (!self) 411 return NULL; 412 self->handle = handle; 413 self->kind = kind; 414 self->count = 0; 415 self->last_tid = 0; 416 self->maxvalue = maxvalue; 417 self->name = name; 418 return (PyObject*)self; 419 } 420 421 static PyObject * 422 semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds) 423 { 424 SEM_HANDLE handle = SEM_FAILED; 425 int kind, maxvalue, value, unlink; 426 PyObject *result; 427 char *name, *name_copy = NULL; 428 static char *kwlist[] = {"kind", "value", "maxvalue", "name", "unlink", 429 NULL}; 430 431 if (!PyArg_ParseTupleAndKeywords(args, kwds, "iiisi", kwlist, 432 &kind, &value, &maxvalue, &name, &unlink)) 433 return NULL; 434 435 if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) { 436 PyErr_SetString(PyExc_ValueError, "unrecognized kind"); 437 return NULL; 438 } 439 440 if (!unlink) { 441 name_copy = PyMem_Malloc(strlen(name) + 1); 442 if (name_copy == NULL) 443 goto failure; 444 strcpy(name_copy, name); 445 } 446 447 SEM_CLEAR_ERROR(); 448 handle = SEM_CREATE(name, value, maxvalue); 449 /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */ 450 if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0) 451 goto failure; 452 453 if (unlink && SEM_UNLINK(name) < 0) 454 goto failure; 455 456 result = newsemlockobject(type, handle, kind, maxvalue, name_copy); 457 if (!result) 458 goto failure; 459 460 return result; 461 462 failure: 463 if (handle != SEM_FAILED) 464 SEM_CLOSE(handle); 465 PyMem_Free(name_copy); 466 _PyMp_SetError(NULL, MP_STANDARD_ERROR); 467 return NULL; 468 } 469 470 static PyObject * 471 semlock_rebuild(PyTypeObject *type, PyObject *args) 472 { 473 SEM_HANDLE handle; 474 int kind, maxvalue; 475 char *name, *name_copy = NULL; 476 477 if (!PyArg_ParseTuple(args, F_SEM_HANDLE "iiz", 478 &handle, &kind, &maxvalue, &name)) 479 return NULL; 480 481 if (name != NULL) { 482 name_copy = PyMem_Malloc(strlen(name) + 1); 483 if (name_copy == NULL) 484 return PyErr_NoMemory(); 485 strcpy(name_copy, name); 486 } 487 488 #ifndef MS_WINDOWS 489 if (name != NULL) { 490 handle = sem_open(name, 0); 491 if (handle == SEM_FAILED) { 492 PyMem_Free(name_copy); 493 return PyErr_SetFromErrno(PyExc_OSError); 494 } 495 } 496 #endif 497 498 return newsemlockobject(type, handle, kind, maxvalue, name_copy); 499 } 500 501 static void 502 semlock_dealloc(SemLockObject* self) 503 { 504 if (self->handle != SEM_FAILED) 505 SEM_CLOSE(self->handle); 506 PyMem_Free(self->name); 507 PyObject_Del(self); 508 } 509 510 static PyObject * 511 semlock_count(SemLockObject *self) 512 { 513 return PyLong_FromLong((long)self->count); 514 } 515 516 static PyObject * 517 semlock_ismine(SemLockObject *self) 518 { 519 /* only makes sense for a lock */ 520 return PyBool_FromLong(ISMINE(self)); 521 } 522 523 static PyObject * 524 semlock_getvalue(SemLockObject *self) 525 { 526 #ifdef HAVE_BROKEN_SEM_GETVALUE 527 PyErr_SetNone(PyExc_NotImplementedError); 528 return NULL; 529 #else 530 int sval; 531 if (SEM_GETVALUE(self->handle, &sval) < 0) 532 return _PyMp_SetError(NULL, MP_STANDARD_ERROR); 533 /* some posix implementations use negative numbers to indicate 534 the number of waiting threads */ 535 if (sval < 0) 536 sval = 0; 537 return PyLong_FromLong((long)sval); 538 #endif 539 } 540 541 static PyObject * 542 semlock_iszero(SemLockObject *self) 543 { 544 #ifdef HAVE_BROKEN_SEM_GETVALUE 545 if (sem_trywait(self->handle) < 0) { 546 if (errno == EAGAIN) 547 Py_RETURN_TRUE; 548 return _PyMp_SetError(NULL, MP_STANDARD_ERROR); 549 } else { 550 if (sem_post(self->handle) < 0) 551 return _PyMp_SetError(NULL, MP_STANDARD_ERROR); 552 Py_RETURN_FALSE; 553 } 554 #else 555 int sval; 556 if (SEM_GETVALUE(self->handle, &sval) < 0) 557 return _PyMp_SetError(NULL, MP_STANDARD_ERROR); 558 return PyBool_FromLong((long)sval == 0); 559 #endif 560 } 561 562 static PyObject * 563 semlock_afterfork(SemLockObject *self) 564 { 565 self->count = 0; 566 Py_RETURN_NONE; 567 } 568 569 /* 570 * Semaphore methods 571 */ 572 573 static PyMethodDef semlock_methods[] = { 574 {"acquire", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS, 575 "acquire the semaphore/lock"}, 576 {"release", (PyCFunction)semlock_release, METH_NOARGS, 577 "release the semaphore/lock"}, 578 {"__enter__", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS, 579 "enter the semaphore/lock"}, 580 {"__exit__", (PyCFunction)semlock_release, METH_VARARGS, 581 "exit the semaphore/lock"}, 582 {"_count", (PyCFunction)semlock_count, METH_NOARGS, 583 "num of `acquire()`s minus num of `release()`s for this process"}, 584 {"_is_mine", (PyCFunction)semlock_ismine, METH_NOARGS, 585 "whether the lock is owned by this thread"}, 586 {"_get_value", (PyCFunction)semlock_getvalue, METH_NOARGS, 587 "get the value of the semaphore"}, 588 {"_is_zero", (PyCFunction)semlock_iszero, METH_NOARGS, 589 "returns whether semaphore has value zero"}, 590 {"_rebuild", (PyCFunction)semlock_rebuild, METH_VARARGS | METH_CLASS, 591 ""}, 592 {"_after_fork", (PyCFunction)semlock_afterfork, METH_NOARGS, 593 "rezero the net acquisition count after fork()"}, 594 {NULL} 595 }; 596 597 /* 598 * Member table 599 */ 600 601 static PyMemberDef semlock_members[] = { 602 {"handle", T_SEM_HANDLE, offsetof(SemLockObject, handle), READONLY, 603 ""}, 604 {"kind", T_INT, offsetof(SemLockObject, kind), READONLY, 605 ""}, 606 {"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY, 607 ""}, 608 {"name", T_STRING, offsetof(SemLockObject, name), READONLY, 609 ""}, 610 {NULL} 611 }; 612 613 /* 614 * Semaphore type 615 */ 616 617 PyTypeObject _PyMp_SemLockType = { 618 PyVarObject_HEAD_INIT(NULL, 0) 619 /* tp_name */ "_multiprocessing.SemLock", 620 /* tp_basicsize */ sizeof(SemLockObject), 621 /* tp_itemsize */ 0, 622 /* tp_dealloc */ (destructor)semlock_dealloc, 623 /* tp_print */ 0, 624 /* tp_getattr */ 0, 625 /* tp_setattr */ 0, 626 /* tp_reserved */ 0, 627 /* tp_repr */ 0, 628 /* tp_as_number */ 0, 629 /* tp_as_sequence */ 0, 630 /* tp_as_mapping */ 0, 631 /* tp_hash */ 0, 632 /* tp_call */ 0, 633 /* tp_str */ 0, 634 /* tp_getattro */ 0, 635 /* tp_setattro */ 0, 636 /* tp_as_buffer */ 0, 637 /* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, 638 /* tp_doc */ "Semaphore/Mutex type", 639 /* tp_traverse */ 0, 640 /* tp_clear */ 0, 641 /* tp_richcompare */ 0, 642 /* tp_weaklistoffset */ 0, 643 /* tp_iter */ 0, 644 /* tp_iternext */ 0, 645 /* tp_methods */ semlock_methods, 646 /* tp_members */ semlock_members, 647 /* tp_getset */ 0, 648 /* tp_base */ 0, 649 /* tp_dict */ 0, 650 /* tp_descr_get */ 0, 651 /* tp_descr_set */ 0, 652 /* tp_dictoffset */ 0, 653 /* tp_init */ 0, 654 /* tp_alloc */ 0, 655 /* tp_new */ semlock_new, 656 }; 657 658 /* 659 * Function to unlink semaphore names 660 */ 661 662 PyObject * 663 _PyMp_sem_unlink(PyObject *ignore, PyObject *args) 664 { 665 char *name; 666 667 if (!PyArg_ParseTuple(args, "s", &name)) 668 return NULL; 669 670 if (SEM_UNLINK(name) < 0) { 671 _PyMp_SetError(NULL, MP_STANDARD_ERROR); 672 return NULL; 673 } 674 675 Py_RETURN_NONE; 676 } 677