Home | History | Annotate | Download | only in _multiprocessing
      1 /*
      2  * A type which wraps a pipe handle in message oriented mode
      3  *
      4  * pipe_connection.c
      5  *
      6  * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
      7  */
      8 
      9 #include "multiprocessing.h"
     10 
     11 #define CLOSE(h) CloseHandle(h)
     12 
     13 /*
     14  * Send string to the pipe; assumes in message oriented mode
     15  */
     16 
     17 static Py_ssize_t
     18 conn_send_string(ConnectionObject *conn, char *string, size_t length)
     19 {
     20     DWORD amount_written;
     21     BOOL ret;
     22 
     23     Py_BEGIN_ALLOW_THREADS
     24     ret = WriteFile(conn->handle, string, length, &amount_written, NULL);
     25     Py_END_ALLOW_THREADS
     26 
     27     if (ret == 0 && GetLastError() == ERROR_NO_SYSTEM_RESOURCES) {
     28         PyErr_Format(PyExc_ValueError, "Cannnot send %" PY_FORMAT_SIZE_T "d bytes over connection", length);
     29         return MP_STANDARD_ERROR;
     30     }
     31 
     32     return ret ? MP_SUCCESS : MP_STANDARD_ERROR;
     33 }
     34 
     35 /*
     36  * Attempts to read into buffer, or if buffer too small into *newbuffer.
     37  *
     38  * Returns number of bytes read.  Assumes in message oriented mode.
     39  */
     40 
     41 static Py_ssize_t
     42 conn_recv_string(ConnectionObject *conn, char *buffer,
     43                  size_t buflength, char **newbuffer, size_t maxlength)
     44 {
     45     DWORD left, length, full_length, err;
     46     BOOL ret;
     47     *newbuffer = NULL;
     48 
     49     Py_BEGIN_ALLOW_THREADS
     50     ret = ReadFile(conn->handle, buffer, MIN(buflength, maxlength),
     51                   &length, NULL);
     52     Py_END_ALLOW_THREADS
     53     if (ret)
     54         return length;
     55 
     56     err = GetLastError();
     57     if (err != ERROR_MORE_DATA) {
     58         if (err == ERROR_BROKEN_PIPE)
     59             return MP_END_OF_FILE;
     60         return MP_STANDARD_ERROR;
     61     }
     62 
     63     if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left))
     64         return MP_STANDARD_ERROR;
     65 
     66     full_length = length + left;
     67     if (full_length > maxlength)
     68         return MP_BAD_MESSAGE_LENGTH;
     69 
     70     *newbuffer = PyMem_Malloc(full_length);
     71     if (*newbuffer == NULL)
     72         return MP_MEMORY_ERROR;
     73 
     74     memcpy(*newbuffer, buffer, length);
     75 
     76     Py_BEGIN_ALLOW_THREADS
     77     ret = ReadFile(conn->handle, *newbuffer+length, left, &length, NULL);
     78     Py_END_ALLOW_THREADS
     79     if (ret) {
     80         assert(length == left);
     81         return full_length;
     82     } else {
     83         PyMem_Free(*newbuffer);
     84         return MP_STANDARD_ERROR;
     85     }
     86 }
     87 
     88 /*
     89  * Check whether any data is available for reading
     90  */
     91 
     92 static int
     93 conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
     94 {
     95     DWORD bytes, deadline, delay;
     96     int difference, res;
     97     BOOL block = FALSE;
     98 
     99     if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
    100         return MP_STANDARD_ERROR;
    101 
    102     if (timeout == 0.0)
    103         return bytes > 0;
    104 
    105     if (timeout < 0.0)
    106         block = TRUE;
    107     else
    108         /* XXX does not check for overflow */
    109         deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5);
    110 
    111     Sleep(0);
    112 
    113     for (delay = 1 ; ; delay += 1) {
    114         if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
    115             return MP_STANDARD_ERROR;
    116         else if (bytes > 0)
    117             return TRUE;
    118 
    119         if (!block) {
    120             difference = deadline - GetTickCount();
    121             if (difference < 0)
    122                 return FALSE;
    123             if ((int)delay > difference)
    124                 delay = difference;
    125         }
    126 
    127         if (delay > 20)
    128             delay = 20;
    129 
    130         Sleep(delay);
    131 
    132         /* check for signals */
    133         Py_BLOCK_THREADS
    134         res = PyErr_CheckSignals();
    135         Py_UNBLOCK_THREADS
    136 
    137         if (res)
    138             return MP_EXCEPTION_HAS_BEEN_SET;
    139     }
    140 }
    141 
    142 /*
    143  * "connection.h" defines the PipeConnection type using the definitions above
    144  */
    145 
    146 #define CONNECTION_NAME "PipeConnection"
    147 #define CONNECTION_TYPE PipeConnectionType
    148 
    149 #include "connection.h"
    150