Home | History | Annotate | Download | only in _multiprocessing
      1 /*
      2  * Extension module used by multiprocessing package
      3  *
      4  * multiprocessing.c
      5  *
      6  * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
      7  */
      8 
      9 #include "multiprocessing.h"
     10 
     11 #if (defined(CMSG_LEN) && defined(SCM_RIGHTS))
     12     #define HAVE_FD_TRANSFER 1
     13 #else
     14     #define HAVE_FD_TRANSFER 0
     15 #endif
     16 
     17 PyObject *create_win32_namespace(void);
     18 
     19 PyObject *pickle_dumps, *pickle_loads, *pickle_protocol;
     20 PyObject *ProcessError, *BufferTooShort;
     21 
     22 /*
     23  * Function which raises exceptions based on error codes
     24  */
     25 
     26 PyObject *
     27 mp_SetError(PyObject *Type, int num)
     28 {
     29     switch (num) {
     30 #ifdef MS_WINDOWS
     31     case MP_STANDARD_ERROR:
     32         if (Type == NULL)
     33             Type = PyExc_WindowsError;
     34         PyErr_SetExcFromWindowsErr(Type, 0);
     35         break;
     36     case MP_SOCKET_ERROR:
     37         if (Type == NULL)
     38             Type = PyExc_WindowsError;
     39         PyErr_SetExcFromWindowsErr(Type, WSAGetLastError());
     40         break;
     41 #else /* !MS_WINDOWS */
     42     case MP_STANDARD_ERROR:
     43     case MP_SOCKET_ERROR:
     44         if (Type == NULL)
     45             Type = PyExc_OSError;
     46         PyErr_SetFromErrno(Type);
     47         break;
     48 #endif /* !MS_WINDOWS */
     49     case MP_MEMORY_ERROR:
     50         PyErr_NoMemory();
     51         break;
     52     case MP_END_OF_FILE:
     53         PyErr_SetNone(PyExc_EOFError);
     54         break;
     55     case MP_EARLY_END_OF_FILE:
     56         PyErr_SetString(PyExc_IOError,
     57                         "got end of file during message");
     58         break;
     59     case MP_BAD_MESSAGE_LENGTH:
     60         PyErr_SetString(PyExc_IOError, "bad message length");
     61         break;
     62     case MP_EXCEPTION_HAS_BEEN_SET:
     63         break;
     64     default:
     65         PyErr_Format(PyExc_RuntimeError,
     66                      "unknown error number %d", num);
     67     }
     68     return NULL;
     69 }
     70 
     71 
     72 /*
     73  * Windows only
     74  */
     75 
     76 #ifdef MS_WINDOWS
     77 
     78 /* On Windows we set an event to signal Ctrl-C; compare with timemodule.c */
     79 
     80 HANDLE sigint_event = NULL;
     81 
     82 static BOOL WINAPI
     83 ProcessingCtrlHandler(DWORD dwCtrlType)
     84 {
     85     SetEvent(sigint_event);
     86     return FALSE;
     87 }
     88 
     89 /*
     90  * Unix only
     91  */
     92 
     93 #else /* !MS_WINDOWS */
     94 
     95 #if HAVE_FD_TRANSFER
     96 
     97 /* Functions for transferring file descriptors between processes.
     98    Reimplements some of the functionality of the fdcred
     99    module at http://www.mca-ltd.com/resources/fdcred_1.tgz. */
    100 /* Based in http://resin.csoft.net/cgi-bin/man.cgi?section=3&topic=CMSG_DATA */
    101 
    102 static PyObject *
    103 multiprocessing_sendfd(PyObject *self, PyObject *args)
    104 {
    105     int conn, fd, res;
    106     struct iovec dummy_iov;
    107     char dummy_char;
    108     struct msghdr msg;
    109     struct cmsghdr *cmsg;
    110     union {
    111         struct cmsghdr hdr;
    112         unsigned char buf[CMSG_SPACE(sizeof(int))];
    113     } cmsgbuf;
    114 
    115     if (!PyArg_ParseTuple(args, "ii", &conn, &fd))
    116         return NULL;
    117 
    118     dummy_iov.iov_base = &dummy_char;
    119     dummy_iov.iov_len = 1;
    120 
    121     memset(&msg, 0, sizeof(msg));
    122     msg.msg_control = &cmsgbuf.buf;
    123     msg.msg_controllen = sizeof(cmsgbuf.buf);
    124     msg.msg_iov = &dummy_iov;
    125     msg.msg_iovlen = 1;
    126 
    127     cmsg = CMSG_FIRSTHDR(&msg);
    128     cmsg->cmsg_len = CMSG_LEN(sizeof(int));
    129     cmsg->cmsg_level = SOL_SOCKET;
    130     cmsg->cmsg_type = SCM_RIGHTS;
    131     * (int *) CMSG_DATA(cmsg) = fd;
    132 
    133     Py_BEGIN_ALLOW_THREADS
    134     res = sendmsg(conn, &msg, 0);
    135     Py_END_ALLOW_THREADS
    136 
    137     if (res < 0)
    138         return PyErr_SetFromErrno(PyExc_OSError);
    139     Py_RETURN_NONE;
    140 }
    141 
    142 static PyObject *
    143 multiprocessing_recvfd(PyObject *self, PyObject *args)
    144 {
    145     int conn, fd, res;
    146     char dummy_char;
    147     struct iovec dummy_iov;
    148     struct msghdr msg = {0};
    149     struct cmsghdr *cmsg;
    150     union {
    151         struct cmsghdr hdr;
    152         unsigned char buf[CMSG_SPACE(sizeof(int))];
    153     } cmsgbuf;
    154 
    155     if (!PyArg_ParseTuple(args, "i", &conn))
    156         return NULL;
    157 
    158     dummy_iov.iov_base = &dummy_char;
    159     dummy_iov.iov_len = 1;
    160 
    161     memset(&msg, 0, sizeof(msg));
    162     msg.msg_control = &cmsgbuf.buf;
    163     msg.msg_controllen = sizeof(cmsgbuf.buf);
    164     msg.msg_iov = &dummy_iov;
    165     msg.msg_iovlen = 1;
    166 
    167     cmsg = CMSG_FIRSTHDR(&msg);
    168     cmsg->cmsg_level = SOL_SOCKET;
    169     cmsg->cmsg_type = SCM_RIGHTS;
    170     cmsg->cmsg_len = CMSG_LEN(sizeof(int));
    171     msg.msg_controllen = cmsg->cmsg_len;
    172 
    173     Py_BEGIN_ALLOW_THREADS
    174     res = recvmsg(conn, &msg, 0);
    175     Py_END_ALLOW_THREADS
    176 
    177     if (res < 0)
    178         return PyErr_SetFromErrno(PyExc_OSError);
    179 
    180     if (msg.msg_controllen < CMSG_LEN(sizeof(int)) ||
    181         (cmsg = CMSG_FIRSTHDR(&msg)) == NULL ||
    182         cmsg->cmsg_level != SOL_SOCKET ||
    183         cmsg->cmsg_type != SCM_RIGHTS ||
    184         cmsg->cmsg_len < CMSG_LEN(sizeof(int))) {
    185         /* If at least one control message is present, there should be
    186            no room for any further data in the buffer. */
    187         PyErr_SetString(PyExc_RuntimeError, "No file descriptor received");
    188         return NULL;
    189     }
    190 
    191     fd = * (int *) CMSG_DATA(cmsg);
    192     return Py_BuildValue("i", fd);
    193 }
    194 
    195 #endif /* HAVE_FD_TRANSFER */
    196 
    197 #endif /* !MS_WINDOWS */
    198 
    199 
    200 /*
    201  * All platforms
    202  */
    203 
    204 static PyObject*
    205 multiprocessing_address_of_buffer(PyObject *self, PyObject *obj)
    206 {
    207     void *buffer;
    208     Py_ssize_t buffer_len;
    209 
    210     if (PyObject_AsWriteBuffer(obj, &buffer, &buffer_len) < 0)
    211         return NULL;
    212 
    213     return Py_BuildValue("N" F_PY_SSIZE_T,
    214                          PyLong_FromVoidPtr(buffer), buffer_len);
    215 }
    216 
    217 
    218 /*
    219  * Function table
    220  */
    221 
    222 static PyMethodDef module_methods[] = {
    223     {"address_of_buffer", multiprocessing_address_of_buffer, METH_O,
    224      "address_of_buffer(obj) -> int\n"
    225      "Return address of obj assuming obj supports buffer inteface"},
    226 #if HAVE_FD_TRANSFER
    227     {"sendfd", multiprocessing_sendfd, METH_VARARGS,
    228      "sendfd(sockfd, fd) -> None\n"
    229      "Send file descriptor given by fd over the unix domain socket\n"
    230      "whose file decriptor is sockfd"},
    231     {"recvfd", multiprocessing_recvfd, METH_VARARGS,
    232      "recvfd(sockfd) -> fd\n"
    233      "Receive a file descriptor over a unix domain socket\n"
    234      "whose file decriptor is sockfd"},
    235 #endif
    236     {NULL}
    237 };
    238 
    239 
    240 /*
    241  * Initialize
    242  */
    243 
    244 PyMODINIT_FUNC
    245 init_multiprocessing(void)
    246 {
    247     PyObject *module, *temp, *value;
    248 
    249     /* Initialize module */
    250     module = Py_InitModule("_multiprocessing", module_methods);
    251     if (!module)
    252         return;
    253 
    254     /* Get copy of objects from pickle */
    255     temp = PyImport_ImportModule(PICKLE_MODULE);
    256     if (!temp)
    257         return;
    258     pickle_dumps = PyObject_GetAttrString(temp, "dumps");
    259     pickle_loads = PyObject_GetAttrString(temp, "loads");
    260     pickle_protocol = PyObject_GetAttrString(temp, "HIGHEST_PROTOCOL");
    261     Py_XDECREF(temp);
    262 
    263     /* Get copy of BufferTooShort */
    264     temp = PyImport_ImportModule("multiprocessing");
    265     if (!temp)
    266         return;
    267     BufferTooShort = PyObject_GetAttrString(temp, "BufferTooShort");
    268     Py_XDECREF(temp);
    269 
    270     /* Add connection type to module */
    271     if (PyType_Ready(&ConnectionType) < 0)
    272         return;
    273     Py_INCREF(&ConnectionType);
    274     PyModule_AddObject(module, "Connection", (PyObject*)&ConnectionType);
    275 
    276 #if defined(MS_WINDOWS) ||                                              \
    277   (defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED))
    278     /* Add SemLock type to module */
    279     if (PyType_Ready(&SemLockType) < 0)
    280         return;
    281     Py_INCREF(&SemLockType);
    282     {
    283         PyObject *py_sem_value_max;
    284         /* Some systems define SEM_VALUE_MAX as an unsigned value that
    285          * causes it to be negative when used as an int (NetBSD). */
    286         if ((int)(SEM_VALUE_MAX) < 0)
    287             py_sem_value_max = PyLong_FromLong(INT_MAX);
    288         else
    289             py_sem_value_max = PyLong_FromLong(SEM_VALUE_MAX);
    290         if (py_sem_value_max == NULL)
    291             return;
    292         PyDict_SetItemString(SemLockType.tp_dict, "SEM_VALUE_MAX",
    293                              py_sem_value_max);
    294     }
    295     PyModule_AddObject(module, "SemLock", (PyObject*)&SemLockType);
    296 #endif
    297 
    298 #ifdef MS_WINDOWS
    299     /* Add PipeConnection to module */
    300     if (PyType_Ready(&PipeConnectionType) < 0)
    301         return;
    302     Py_INCREF(&PipeConnectionType);
    303     PyModule_AddObject(module, "PipeConnection",
    304                        (PyObject*)&PipeConnectionType);
    305 
    306     /* Initialize win32 class and add to multiprocessing */
    307     temp = create_win32_namespace();
    308     if (!temp)
    309         return;
    310     PyModule_AddObject(module, "win32", temp);
    311 
    312     /* Initialize the event handle used to signal Ctrl-C */
    313     sigint_event = CreateEvent(NULL, TRUE, FALSE, NULL);
    314     if (!sigint_event) {
    315         PyErr_SetFromWindowsErr(0);
    316         return;
    317     }
    318     if (!SetConsoleCtrlHandler(ProcessingCtrlHandler, TRUE)) {
    319         PyErr_SetFromWindowsErr(0);
    320         return;
    321     }
    322 #endif
    323 
    324     /* Add configuration macros */
    325     temp = PyDict_New();
    326     if (!temp)
    327         return;
    328 #define ADD_FLAG(name)                                            \
    329     value = Py_BuildValue("i", name);                             \
    330     if (value == NULL) { Py_DECREF(temp); return; }               \
    331     if (PyDict_SetItemString(temp, #name, value) < 0) {           \
    332         Py_DECREF(temp); Py_DECREF(value); return; }              \
    333     Py_DECREF(value)
    334 
    335 #if defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED)
    336     ADD_FLAG(HAVE_SEM_OPEN);
    337 #endif
    338 #ifdef HAVE_SEM_TIMEDWAIT
    339     ADD_FLAG(HAVE_SEM_TIMEDWAIT);
    340 #endif
    341 #ifdef HAVE_FD_TRANSFER
    342     ADD_FLAG(HAVE_FD_TRANSFER);
    343 #endif
    344 #ifdef HAVE_BROKEN_SEM_GETVALUE
    345     ADD_FLAG(HAVE_BROKEN_SEM_GETVALUE);
    346 #endif
    347 #ifdef HAVE_BROKEN_SEM_UNLINK
    348     ADD_FLAG(HAVE_BROKEN_SEM_UNLINK);
    349 #endif
    350     if (PyModule_AddObject(module, "flags", temp) < 0)
    351         return;
    352 }
    353