Home | History | Annotate | Download | only in _multiprocessing
      1 /*
      2  * A type which wraps a semaphore
      3  *
      4  * semaphore.c
      5  *
      6  * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
      7  */
      8 
      9 #include "multiprocessing.h"
     10 
     11 enum { RECURSIVE_MUTEX, SEMAPHORE };
     12 
     13 typedef struct {
     14     PyObject_HEAD
     15     SEM_HANDLE handle;
     16     long last_tid;
     17     int count;
     18     int maxvalue;
     19     int kind;
     20 } SemLockObject;
     21 
     22 #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
     23 
     24 
     25 #ifdef MS_WINDOWS
     26 
     27 /*
     28  * Windows definitions
     29  */
     30 
     31 #define SEM_FAILED NULL
     32 
     33 #define SEM_CLEAR_ERROR() SetLastError(0)
     34 #define SEM_GET_LAST_ERROR() GetLastError()
     35 #define SEM_CREATE(name, val, max) CreateSemaphore(NULL, val, max, NULL)
     36 #define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)
     37 #define SEM_GETVALUE(sem, pval) _GetSemaphoreValue(sem, pval)
     38 #define SEM_UNLINK(name) 0
     39 
     40 static int
     41 _GetSemaphoreValue(HANDLE handle, long *value)
     42 {
     43     long previous;
     44 
     45     switch (WaitForSingleObject(handle, 0)) {
     46     case WAIT_OBJECT_0:
     47         if (!ReleaseSemaphore(handle, 1, &previous))
     48             return MP_STANDARD_ERROR;
     49         *value = previous + 1;
     50         return 0;
     51     case WAIT_TIMEOUT:
     52         *value = 0;
     53         return 0;
     54     default:
     55         return MP_STANDARD_ERROR;
     56     }
     57 }
     58 
     59 static PyObject *
     60 semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
     61 {
     62     int blocking = 1;
     63     double timeout;
     64     PyObject *timeout_obj = Py_None;
     65     DWORD res, full_msecs, msecs, start, ticks;
     66 
     67     static char *kwlist[] = {"block", "timeout", NULL};
     68 
     69     if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
     70                                      &blocking, &timeout_obj))
     71         return NULL;
     72 
     73     /* calculate timeout */
     74     if (!blocking) {
     75         full_msecs = 0;
     76     } else if (timeout_obj == Py_None) {
     77         full_msecs = INFINITE;
     78     } else {
     79         timeout = PyFloat_AsDouble(timeout_obj);
     80         if (PyErr_Occurred())
     81             return NULL;
     82         timeout *= 1000.0;      /* convert to millisecs */
     83         if (timeout < 0.0) {
     84             timeout = 0.0;
     85         } else if (timeout >= 0.5 * INFINITE) { /* 25 days */
     86             PyErr_SetString(PyExc_OverflowError,
     87                             "timeout is too large");
     88             return NULL;
     89         }
     90         full_msecs = (DWORD)(timeout + 0.5);
     91     }
     92 
     93     /* check whether we already own the lock */
     94     if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
     95         ++self->count;
     96         Py_RETURN_TRUE;
     97     }
     98 
     99     /* check whether we can acquire without blocking */
    100     if (WaitForSingleObject(self->handle, 0) == WAIT_OBJECT_0) {
    101         self->last_tid = GetCurrentThreadId();
    102         ++self->count;
    103         Py_RETURN_TRUE;
    104     }
    105 
    106     msecs = full_msecs;
    107     start = GetTickCount();
    108 
    109     for ( ; ; ) {
    110         HANDLE handles[2] = {self->handle, sigint_event};
    111 
    112         /* do the wait */
    113         Py_BEGIN_ALLOW_THREADS
    114         ResetEvent(sigint_event);
    115         res = WaitForMultipleObjects(2, handles, FALSE, msecs);
    116         Py_END_ALLOW_THREADS
    117 
    118         /* handle result */
    119         if (res != WAIT_OBJECT_0 + 1)
    120             break;
    121 
    122         /* got SIGINT so give signal handler a chance to run */
    123         Sleep(1);
    124 
    125         /* if this is main thread let KeyboardInterrupt be raised */
    126         if (PyErr_CheckSignals())
    127             return NULL;
    128 
    129         /* recalculate timeout */
    130         if (msecs != INFINITE) {
    131             ticks = GetTickCount();
    132             if ((DWORD)(ticks - start) >= full_msecs)
    133                 Py_RETURN_FALSE;
    134             msecs = full_msecs - (ticks - start);
    135         }
    136     }
    137 
    138     /* handle result */
    139     switch (res) {
    140     case WAIT_TIMEOUT:
    141         Py_RETURN_FALSE;
    142     case WAIT_OBJECT_0:
    143         self->last_tid = GetCurrentThreadId();
    144         ++self->count;
    145         Py_RETURN_TRUE;
    146     case WAIT_FAILED:
    147         return PyErr_SetFromWindowsErr(0);
    148     default:
    149         PyErr_Format(PyExc_RuntimeError, "WaitForSingleObject() or "
    150                      "WaitForMultipleObjects() gave unrecognized "
    151                      "value %d", res);
    152         return NULL;
    153     }
    154 }
    155 
    156 static PyObject *
    157 semlock_release(SemLockObject *self, PyObject *args)
    158 {
    159     if (self->kind == RECURSIVE_MUTEX) {
    160         if (!ISMINE(self)) {
    161             PyErr_SetString(PyExc_AssertionError, "attempt to "
    162                             "release recursive lock not owned "
    163                             "by thread");
    164             return NULL;
    165         }
    166         if (self->count > 1) {
    167             --self->count;
    168             Py_RETURN_NONE;
    169         }
    170         assert(self->count == 1);
    171     }
    172 
    173     if (!ReleaseSemaphore(self->handle, 1, NULL)) {
    174         if (GetLastError() == ERROR_TOO_MANY_POSTS) {
    175             PyErr_SetString(PyExc_ValueError, "semaphore or lock "
    176                             "released too many times");
    177             return NULL;
    178         } else {
    179             return PyErr_SetFromWindowsErr(0);
    180         }
    181     }
    182 
    183     --self->count;
    184     Py_RETURN_NONE;
    185 }
    186 
    187 #else /* !MS_WINDOWS */
    188 
    189 /*
    190  * Unix definitions
    191  */
    192 
    193 #define SEM_CLEAR_ERROR()
    194 #define SEM_GET_LAST_ERROR() 0
    195 #define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val)
    196 #define SEM_CLOSE(sem) sem_close(sem)
    197 #define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)
    198 #define SEM_UNLINK(name) sem_unlink(name)
    199 
    200 /* OS X 10.4 defines SEM_FAILED as -1 instead of (sem_t *)-1;  this gives
    201    compiler warnings, and (potentially) undefined behaviour. */
    202 #ifdef __APPLE__
    203 #  undef SEM_FAILED
    204 #  define SEM_FAILED ((sem_t *)-1)
    205 #endif
    206 
    207 #ifndef HAVE_SEM_UNLINK
    208 #  define sem_unlink(name) 0
    209 #endif
    210 
    211 #ifndef HAVE_SEM_TIMEDWAIT
    212 #  define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save)
    213 
    214 int
    215 sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save)
    216 {
    217     int res;
    218     unsigned long delay, difference;
    219     struct timeval now, tvdeadline, tvdelay;
    220 
    221     errno = 0;
    222     tvdeadline.tv_sec = deadline->tv_sec;
    223     tvdeadline.tv_usec = deadline->tv_nsec / 1000;
    224 
    225     for (delay = 0 ; ; delay += 1000) {
    226         /* poll */
    227         if (sem_trywait(sem) == 0)
    228             return 0;
    229         else if (errno != EAGAIN)
    230             return MP_STANDARD_ERROR;
    231 
    232         /* get current time */
    233         if (gettimeofday(&now, NULL) < 0)
    234             return MP_STANDARD_ERROR;
    235 
    236         /* check for timeout */
    237         if (tvdeadline.tv_sec < now.tv_sec ||
    238             (tvdeadline.tv_sec == now.tv_sec &&
    239              tvdeadline.tv_usec <= now.tv_usec)) {
    240             errno = ETIMEDOUT;
    241             return MP_STANDARD_ERROR;
    242         }
    243 
    244         /* calculate how much time is left */
    245         difference = (tvdeadline.tv_sec - now.tv_sec) * 1000000 +
    246             (tvdeadline.tv_usec - now.tv_usec);
    247 
    248         /* check delay not too long -- maximum is 20 msecs */
    249         if (delay > 20000)
    250             delay = 20000;
    251         if (delay > difference)
    252             delay = difference;
    253 
    254         /* sleep */
    255         tvdelay.tv_sec = delay / 1000000;
    256         tvdelay.tv_usec = delay % 1000000;
    257         if (select(0, NULL, NULL, NULL, &tvdelay) < 0)
    258             return MP_STANDARD_ERROR;
    259 
    260         /* check for signals */
    261         Py_BLOCK_THREADS
    262         res = PyErr_CheckSignals();
    263         Py_UNBLOCK_THREADS
    264 
    265         if (res) {
    266             errno = EINTR;
    267             return MP_EXCEPTION_HAS_BEEN_SET;
    268         }
    269     }
    270 }
    271 
    272 #endif /* !HAVE_SEM_TIMEDWAIT */
    273 
    274 static PyObject *
    275 semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
    276 {
    277     int blocking = 1, res;
    278     double timeout;
    279     PyObject *timeout_obj = Py_None;
    280     struct timespec deadline = {0};
    281     struct timeval now;
    282     long sec, nsec;
    283 
    284     static char *kwlist[] = {"block", "timeout", NULL};
    285 
    286     if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
    287                                      &blocking, &timeout_obj))
    288         return NULL;
    289 
    290     if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
    291         ++self->count;
    292         Py_RETURN_TRUE;
    293     }
    294 
    295     if (timeout_obj != Py_None) {
    296         timeout = PyFloat_AsDouble(timeout_obj);
    297         if (PyErr_Occurred())
    298             return NULL;
    299         if (timeout < 0.0)
    300             timeout = 0.0;
    301 
    302         if (gettimeofday(&now, NULL) < 0) {
    303             PyErr_SetFromErrno(PyExc_OSError);
    304             return NULL;
    305         }
    306         sec = (long) timeout;
    307         nsec = (long) (1e9 * (timeout - sec) + 0.5);
    308         deadline.tv_sec = now.tv_sec + sec;
    309         deadline.tv_nsec = now.tv_usec * 1000 + nsec;
    310         deadline.tv_sec += (deadline.tv_nsec / 1000000000);
    311         deadline.tv_nsec %= 1000000000;
    312     }
    313 
    314     do {
    315         Py_BEGIN_ALLOW_THREADS
    316         if (blocking && timeout_obj == Py_None)
    317             res = sem_wait(self->handle);
    318         else if (!blocking)
    319             res = sem_trywait(self->handle);
    320         else
    321             res = sem_timedwait(self->handle, &deadline);
    322         Py_END_ALLOW_THREADS
    323         if (res == MP_EXCEPTION_HAS_BEEN_SET)
    324             break;
    325     } while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
    326 
    327     if (res < 0) {
    328         if (errno == EAGAIN || errno == ETIMEDOUT)
    329             Py_RETURN_FALSE;
    330         else if (errno == EINTR)
    331             return NULL;
    332         else
    333             return PyErr_SetFromErrno(PyExc_OSError);
    334     }
    335 
    336     ++self->count;
    337     self->last_tid = PyThread_get_thread_ident();
    338 
    339     Py_RETURN_TRUE;
    340 }
    341 
    342 static PyObject *
    343 semlock_release(SemLockObject *self, PyObject *args)
    344 {
    345     if (self->kind == RECURSIVE_MUTEX) {
    346         if (!ISMINE(self)) {
    347             PyErr_SetString(PyExc_AssertionError, "attempt to "
    348                             "release recursive lock not owned "
    349                             "by thread");
    350             return NULL;
    351         }
    352         if (self->count > 1) {
    353             --self->count;
    354             Py_RETURN_NONE;
    355         }
    356         assert(self->count == 1);
    357     } else {
    358 #ifdef HAVE_BROKEN_SEM_GETVALUE
    359         /* We will only check properly the maxvalue == 1 case */
    360         if (self->maxvalue == 1) {
    361             /* make sure that already locked */
    362             if (sem_trywait(self->handle) < 0) {
    363                 if (errno != EAGAIN) {
    364                     PyErr_SetFromErrno(PyExc_OSError);
    365                     return NULL;
    366                 }
    367                 /* it is already locked as expected */
    368             } else {
    369                 /* it was not locked so undo wait and raise  */
    370                 if (sem_post(self->handle) < 0) {
    371                     PyErr_SetFromErrno(PyExc_OSError);
    372                     return NULL;
    373                 }
    374                 PyErr_SetString(PyExc_ValueError, "semaphore "
    375                                 "or lock released too many "
    376                                 "times");
    377                 return NULL;
    378             }
    379         }
    380 #else
    381         int sval;
    382 
    383         /* This check is not an absolute guarantee that the semaphore
    384            does not rise above maxvalue. */
    385         if (sem_getvalue(self->handle, &sval) < 0) {
    386             return PyErr_SetFromErrno(PyExc_OSError);
    387         } else if (sval >= self->maxvalue) {
    388             PyErr_SetString(PyExc_ValueError, "semaphore or lock "
    389                             "released too many times");
    390             return NULL;
    391         }
    392 #endif
    393     }
    394 
    395     if (sem_post(self->handle) < 0)
    396         return PyErr_SetFromErrno(PyExc_OSError);
    397 
    398     --self->count;
    399     Py_RETURN_NONE;
    400 }
    401 
    402 #endif /* !MS_WINDOWS */
    403 
    404 /*
    405  * All platforms
    406  */
    407 
    408 static PyObject *
    409 newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
    410 {
    411     SemLockObject *self;
    412 
    413     self = PyObject_New(SemLockObject, type);
    414     if (!self)
    415         return NULL;
    416     self->handle = handle;
    417     self->kind = kind;
    418     self->count = 0;
    419     self->last_tid = 0;
    420     self->maxvalue = maxvalue;
    421     return (PyObject*)self;
    422 }
    423 
    424 static PyObject *
    425 semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
    426 {
    427     char buffer[256];
    428     SEM_HANDLE handle = SEM_FAILED;
    429     int kind, maxvalue, value;
    430     PyObject *result;
    431     static char *kwlist[] = {"kind", "value", "maxvalue", NULL};
    432     int try = 0;
    433 
    434     if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii", kwlist,
    435                                      &kind, &value, &maxvalue))
    436         return NULL;
    437 
    438     if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) {
    439         PyErr_SetString(PyExc_ValueError, "unrecognized kind");
    440         return NULL;
    441     }
    442 
    443     /* Create a semaphore with a unique name. The bytes returned by
    444      * _PyOS_URandom() are treated as unsigned long to ensure that the filename
    445      * is valid (no special characters). */
    446     do {
    447         unsigned long suffix;
    448         _PyOS_URandom((char *)&suffix, sizeof(suffix));
    449         PyOS_snprintf(buffer, sizeof(buffer), "/mp%ld-%lu", (long)getpid(),
    450                       suffix);
    451         SEM_CLEAR_ERROR();
    452         handle = SEM_CREATE(buffer, value, maxvalue);
    453     } while ((handle == SEM_FAILED) && (errno == EEXIST) && (++try < 100));
    454 
    455     /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */
    456     if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0)
    457         goto failure;
    458 
    459     if (SEM_UNLINK(buffer) < 0)
    460         goto failure;
    461 
    462     result = newsemlockobject(type, handle, kind, maxvalue);
    463     if (!result)
    464         goto failure;
    465 
    466     return result;
    467 
    468   failure:
    469     if (handle != SEM_FAILED)
    470         SEM_CLOSE(handle);
    471     mp_SetError(NULL, MP_STANDARD_ERROR);
    472     return NULL;
    473 }
    474 
    475 static PyObject *
    476 semlock_rebuild(PyTypeObject *type, PyObject *args)
    477 {
    478     SEM_HANDLE handle;
    479     int kind, maxvalue;
    480 
    481     if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii",
    482                           &handle, &kind, &maxvalue))
    483         return NULL;
    484 
    485     return newsemlockobject(type, handle, kind, maxvalue);
    486 }
    487 
    488 static void
    489 semlock_dealloc(SemLockObject* self)
    490 {
    491     if (self->handle != SEM_FAILED)
    492         SEM_CLOSE(self->handle);
    493     PyObject_Del(self);
    494 }
    495 
    496 static PyObject *
    497 semlock_count(SemLockObject *self)
    498 {
    499     return PyInt_FromLong((long)self->count);
    500 }
    501 
    502 static PyObject *
    503 semlock_ismine(SemLockObject *self)
    504 {
    505     /* only makes sense for a lock */
    506     return PyBool_FromLong(ISMINE(self));
    507 }
    508 
    509 static PyObject *
    510 semlock_getvalue(SemLockObject *self)
    511 {
    512 #ifdef HAVE_BROKEN_SEM_GETVALUE
    513     PyErr_SetNone(PyExc_NotImplementedError);
    514     return NULL;
    515 #else
    516     int sval;
    517     if (SEM_GETVALUE(self->handle, &sval) < 0)
    518         return mp_SetError(NULL, MP_STANDARD_ERROR);
    519     /* some posix implementations use negative numbers to indicate
    520        the number of waiting threads */
    521     if (sval < 0)
    522         sval = 0;
    523     return PyInt_FromLong((long)sval);
    524 #endif
    525 }
    526 
    527 static PyObject *
    528 semlock_iszero(SemLockObject *self)
    529 {
    530 #ifdef HAVE_BROKEN_SEM_GETVALUE
    531     if (sem_trywait(self->handle) < 0) {
    532         if (errno == EAGAIN)
    533             Py_RETURN_TRUE;
    534         return mp_SetError(NULL, MP_STANDARD_ERROR);
    535     } else {
    536         if (sem_post(self->handle) < 0)
    537             return mp_SetError(NULL, MP_STANDARD_ERROR);
    538         Py_RETURN_FALSE;
    539     }
    540 #else
    541     int sval;
    542     if (SEM_GETVALUE(self->handle, &sval) < 0)
    543         return mp_SetError(NULL, MP_STANDARD_ERROR);
    544     return PyBool_FromLong((long)sval == 0);
    545 #endif
    546 }
    547 
    548 static PyObject *
    549 semlock_afterfork(SemLockObject *self)
    550 {
    551     self->count = 0;
    552     Py_RETURN_NONE;
    553 }
    554 
    555 /*
    556  * Semaphore methods
    557  */
    558 
    559 static PyMethodDef semlock_methods[] = {
    560     {"acquire", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
    561      "acquire the semaphore/lock"},
    562     {"release", (PyCFunction)semlock_release, METH_NOARGS,
    563      "release the semaphore/lock"},
    564     {"__enter__", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
    565      "enter the semaphore/lock"},
    566     {"__exit__", (PyCFunction)semlock_release, METH_VARARGS,
    567      "exit the semaphore/lock"},
    568     {"_count", (PyCFunction)semlock_count, METH_NOARGS,
    569      "num of `acquire()`s minus num of `release()`s for this process"},
    570     {"_is_mine", (PyCFunction)semlock_ismine, METH_NOARGS,
    571      "whether the lock is owned by this thread"},
    572     {"_get_value", (PyCFunction)semlock_getvalue, METH_NOARGS,
    573      "get the value of the semaphore"},
    574     {"_is_zero", (PyCFunction)semlock_iszero, METH_NOARGS,
    575      "returns whether semaphore has value zero"},
    576     {"_rebuild", (PyCFunction)semlock_rebuild, METH_VARARGS | METH_CLASS,
    577      ""},
    578     {"_after_fork", (PyCFunction)semlock_afterfork, METH_NOARGS,
    579      "rezero the net acquisition count after fork()"},
    580     {NULL}
    581 };
    582 
    583 /*
    584  * Member table
    585  */
    586 
    587 static PyMemberDef semlock_members[] = {
    588     {"handle", T_SEM_HANDLE, offsetof(SemLockObject, handle), READONLY,
    589      ""},
    590     {"kind", T_INT, offsetof(SemLockObject, kind), READONLY,
    591      ""},
    592     {"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY,
    593      ""},
    594     {NULL}
    595 };
    596 
    597 /*
    598  * Semaphore type
    599  */
    600 
    601 PyTypeObject SemLockType = {
    602     PyVarObject_HEAD_INIT(NULL, 0)
    603     /* tp_name           */ "_multiprocessing.SemLock",
    604     /* tp_basicsize      */ sizeof(SemLockObject),
    605     /* tp_itemsize       */ 0,
    606     /* tp_dealloc        */ (destructor)semlock_dealloc,
    607     /* tp_print          */ 0,
    608     /* tp_getattr        */ 0,
    609     /* tp_setattr        */ 0,
    610     /* tp_compare        */ 0,
    611     /* tp_repr           */ 0,
    612     /* tp_as_number      */ 0,
    613     /* tp_as_sequence    */ 0,
    614     /* tp_as_mapping     */ 0,
    615     /* tp_hash           */ 0,
    616     /* tp_call           */ 0,
    617     /* tp_str            */ 0,
    618     /* tp_getattro       */ 0,
    619     /* tp_setattro       */ 0,
    620     /* tp_as_buffer      */ 0,
    621     /* tp_flags          */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
    622     /* tp_doc            */ "Semaphore/Mutex type",
    623     /* tp_traverse       */ 0,
    624     /* tp_clear          */ 0,
    625     /* tp_richcompare    */ 0,
    626     /* tp_weaklistoffset */ 0,
    627     /* tp_iter           */ 0,
    628     /* tp_iternext       */ 0,
    629     /* tp_methods        */ semlock_methods,
    630     /* tp_members        */ semlock_members,
    631     /* tp_getset         */ 0,
    632     /* tp_base           */ 0,
    633     /* tp_dict           */ 0,
    634     /* tp_descr_get      */ 0,
    635     /* tp_descr_set      */ 0,
    636     /* tp_dictoffset     */ 0,
    637     /* tp_init           */ 0,
    638     /* tp_alloc          */ 0,
    639     /* tp_new            */ semlock_new,
    640 };
    641