1 /* 2 * Definition of a `Connection` type. 3 * Used by `socket_connection.c` and `pipe_connection.c`. 4 * 5 * connection.h 6 * 7 * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt 8 */ 9 10 #ifndef CONNECTION_H 11 #define CONNECTION_H 12 13 /* 14 * Read/write flags 15 */ 16 17 #define READABLE 1 18 #define WRITABLE 2 19 20 #define CHECK_READABLE(self) \ 21 if (!(self->flags & READABLE)) { \ 22 PyErr_SetString(PyExc_IOError, "connection is write-only"); \ 23 return NULL; \ 24 } 25 26 #define CHECK_WRITABLE(self) \ 27 if (!(self->flags & WRITABLE)) { \ 28 PyErr_SetString(PyExc_IOError, "connection is read-only"); \ 29 return NULL; \ 30 } 31 32 /* 33 * Allocation and deallocation 34 */ 35 36 static PyObject * 37 connection_new(PyTypeObject *type, PyObject *args, PyObject *kwds) 38 { 39 ConnectionObject *self; 40 HANDLE handle; 41 BOOL readable = TRUE, writable = TRUE; 42 43 static char *kwlist[] = {"handle", "readable", "writable", NULL}; 44 45 if (!PyArg_ParseTupleAndKeywords(args, kwds, F_HANDLE "|ii", kwlist, 46 &handle, &readable, &writable)) 47 return NULL; 48 49 if (handle == INVALID_HANDLE_VALUE || (Py_ssize_t)handle < 0) { 50 PyErr_Format(PyExc_IOError, "invalid handle %zd", 51 (Py_ssize_t)handle); 52 return NULL; 53 } 54 55 if (!readable && !writable) { 56 PyErr_SetString(PyExc_ValueError, 57 "either readable or writable must be true"); 58 return NULL; 59 } 60 61 self = PyObject_New(ConnectionObject, type); 62 if (self == NULL) 63 return NULL; 64 65 self->weakreflist = NULL; 66 self->handle = handle; 67 self->flags = 0; 68 69 if (readable) 70 self->flags |= READABLE; 71 if (writable) 72 self->flags |= WRITABLE; 73 assert(self->flags >= 1 && self->flags <= 3); 74 75 return (PyObject*)self; 76 } 77 78 static void 79 connection_dealloc(ConnectionObject* self) 80 { 81 if (self->weakreflist != NULL) 82 PyObject_ClearWeakRefs((PyObject*)self); 83 84 if (self->handle != INVALID_HANDLE_VALUE) { 85 Py_BEGIN_ALLOW_THREADS 86 CLOSE(self->handle); 87 Py_END_ALLOW_THREADS 88 } 89 PyObject_Del(self); 90 } 91 92 /* 93 * Functions for transferring buffers 94 */ 95 96 static PyObject * 97 connection_sendbytes(ConnectionObject *self, PyObject *args) 98 { 99 char *buffer; 100 Py_ssize_t length, offset=0, size=PY_SSIZE_T_MIN; 101 int res; 102 103 if (!PyArg_ParseTuple(args, F_RBUFFER "#|" F_PY_SSIZE_T F_PY_SSIZE_T, 104 &buffer, &length, &offset, &size)) 105 return NULL; 106 107 CHECK_WRITABLE(self); 108 109 if (offset < 0) { 110 PyErr_SetString(PyExc_ValueError, "offset is negative"); 111 return NULL; 112 } 113 if (length < offset) { 114 PyErr_SetString(PyExc_ValueError, "buffer length < offset"); 115 return NULL; 116 } 117 118 if (size == PY_SSIZE_T_MIN) { 119 size = length - offset; 120 } else { 121 if (size < 0) { 122 PyErr_SetString(PyExc_ValueError, "size is negative"); 123 return NULL; 124 } 125 if (offset + size > length) { 126 PyErr_SetString(PyExc_ValueError, 127 "buffer length < offset + size"); 128 return NULL; 129 } 130 } 131 132 res = conn_send_string(self, buffer + offset, size); 133 134 if (res < 0) { 135 if (PyErr_Occurred()) 136 return NULL; 137 else 138 return mp_SetError(PyExc_IOError, res); 139 } 140 141 Py_RETURN_NONE; 142 } 143 144 static PyObject * 145 connection_recvbytes(ConnectionObject *self, PyObject *args) 146 { 147 char *freeme = NULL; 148 Py_ssize_t res, maxlength = PY_SSIZE_T_MAX; 149 PyObject *result = NULL; 150 151 if (!PyArg_ParseTuple(args, "|" F_PY_SSIZE_T, &maxlength)) 152 return NULL; 153 154 CHECK_READABLE(self); 155 156 if (maxlength < 0) { 157 PyErr_SetString(PyExc_ValueError, "maxlength < 0"); 158 return NULL; 159 } 160 161 res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE, 162 &freeme, maxlength); 163 164 if (res < 0) { 165 if (res == MP_BAD_MESSAGE_LENGTH) { 166 if ((self->flags & WRITABLE) == 0) { 167 Py_BEGIN_ALLOW_THREADS 168 CLOSE(self->handle); 169 Py_END_ALLOW_THREADS 170 self->handle = INVALID_HANDLE_VALUE; 171 } else { 172 self->flags = WRITABLE; 173 } 174 } 175 mp_SetError(PyExc_IOError, res); 176 } else { 177 if (freeme == NULL) { 178 result = PyString_FromStringAndSize(self->buffer, res); 179 } else { 180 result = PyString_FromStringAndSize(freeme, res); 181 PyMem_Free(freeme); 182 } 183 } 184 185 return result; 186 } 187 188 static PyObject * 189 connection_recvbytes_into(ConnectionObject *self, PyObject *args) 190 { 191 char *freeme = NULL, *buffer = NULL; 192 Py_ssize_t res, length, offset = 0; 193 PyObject *result = NULL; 194 Py_buffer pbuf; 195 196 CHECK_READABLE(self); 197 198 if (!PyArg_ParseTuple(args, "w*|" F_PY_SSIZE_T, 199 &pbuf, &offset)) 200 return NULL; 201 202 buffer = pbuf.buf; 203 length = pbuf.len; 204 205 if (offset < 0) { 206 PyErr_SetString(PyExc_ValueError, "negative offset"); 207 goto _error; 208 } 209 210 if (offset > length) { 211 PyErr_SetString(PyExc_ValueError, "offset too large"); 212 goto _error; 213 } 214 215 res = conn_recv_string(self, buffer+offset, length-offset, 216 &freeme, PY_SSIZE_T_MAX); 217 218 if (res < 0) { 219 if (res == MP_BAD_MESSAGE_LENGTH) { 220 if ((self->flags & WRITABLE) == 0) { 221 Py_BEGIN_ALLOW_THREADS 222 CLOSE(self->handle); 223 Py_END_ALLOW_THREADS 224 self->handle = INVALID_HANDLE_VALUE; 225 } else { 226 self->flags = WRITABLE; 227 } 228 } 229 mp_SetError(PyExc_IOError, res); 230 } else { 231 if (freeme == NULL) { 232 result = PyInt_FromSsize_t(res); 233 } else { 234 result = PyObject_CallFunction(BufferTooShort, 235 F_RBUFFER "#", 236 freeme, res); 237 PyMem_Free(freeme); 238 if (result) { 239 PyErr_SetObject(BufferTooShort, result); 240 Py_DECREF(result); 241 } 242 goto _error; 243 } 244 } 245 246 _cleanup: 247 PyBuffer_Release(&pbuf); 248 return result; 249 250 _error: 251 result = NULL; 252 goto _cleanup; 253 } 254 255 /* 256 * Functions for transferring objects 257 */ 258 259 static PyObject * 260 connection_send_obj(ConnectionObject *self, PyObject *obj) 261 { 262 char *buffer; 263 int res; 264 Py_ssize_t length; 265 PyObject *pickled_string = NULL; 266 267 CHECK_WRITABLE(self); 268 269 pickled_string = PyObject_CallFunctionObjArgs(pickle_dumps, obj, 270 pickle_protocol, NULL); 271 if (!pickled_string) 272 goto failure; 273 274 if (PyString_AsStringAndSize(pickled_string, &buffer, &length) < 0) 275 goto failure; 276 277 res = conn_send_string(self, buffer, (int)length); 278 279 if (res < 0) { 280 mp_SetError(PyExc_IOError, res); 281 goto failure; 282 } 283 284 Py_XDECREF(pickled_string); 285 Py_RETURN_NONE; 286 287 failure: 288 Py_XDECREF(pickled_string); 289 return NULL; 290 } 291 292 static PyObject * 293 connection_recv_obj(ConnectionObject *self) 294 { 295 char *freeme = NULL; 296 Py_ssize_t res; 297 PyObject *temp = NULL, *result = NULL; 298 299 CHECK_READABLE(self); 300 301 res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE, 302 &freeme, PY_SSIZE_T_MAX); 303 304 if (res < 0) { 305 if (res == MP_BAD_MESSAGE_LENGTH) { 306 if ((self->flags & WRITABLE) == 0) { 307 Py_BEGIN_ALLOW_THREADS 308 CLOSE(self->handle); 309 Py_END_ALLOW_THREADS 310 self->handle = INVALID_HANDLE_VALUE; 311 } else { 312 self->flags = WRITABLE; 313 } 314 } 315 mp_SetError(PyExc_IOError, res); 316 } else { 317 if (freeme == NULL) { 318 temp = PyString_FromStringAndSize(self->buffer, res); 319 } else { 320 temp = PyString_FromStringAndSize(freeme, res); 321 PyMem_Free(freeme); 322 } 323 } 324 325 if (temp) 326 result = PyObject_CallFunctionObjArgs(pickle_loads, 327 temp, NULL); 328 Py_XDECREF(temp); 329 return result; 330 } 331 332 /* 333 * Other functions 334 */ 335 336 static PyObject * 337 connection_poll(ConnectionObject *self, PyObject *args) 338 { 339 PyObject *timeout_obj = NULL; 340 double timeout = 0.0; 341 int res; 342 343 CHECK_READABLE(self); 344 345 if (!PyArg_ParseTuple(args, "|O", &timeout_obj)) 346 return NULL; 347 348 if (timeout_obj == NULL) { 349 timeout = 0.0; 350 } else if (timeout_obj == Py_None) { 351 timeout = -1.0; /* block forever */ 352 } else { 353 timeout = PyFloat_AsDouble(timeout_obj); 354 if (PyErr_Occurred()) 355 return NULL; 356 if (timeout < 0.0) 357 timeout = 0.0; 358 } 359 360 Py_BEGIN_ALLOW_THREADS 361 res = conn_poll(self, timeout, _save); 362 Py_END_ALLOW_THREADS 363 364 switch (res) { 365 case TRUE: 366 Py_RETURN_TRUE; 367 case FALSE: 368 Py_RETURN_FALSE; 369 default: 370 return mp_SetError(PyExc_IOError, res); 371 } 372 } 373 374 static PyObject * 375 connection_fileno(ConnectionObject* self) 376 { 377 if (self->handle == INVALID_HANDLE_VALUE) { 378 PyErr_SetString(PyExc_IOError, "handle is invalid"); 379 return NULL; 380 } 381 return PyInt_FromLong((long)self->handle); 382 } 383 384 static PyObject * 385 connection_close(ConnectionObject *self) 386 { 387 if (self->handle != INVALID_HANDLE_VALUE) { 388 Py_BEGIN_ALLOW_THREADS 389 CLOSE(self->handle); 390 Py_END_ALLOW_THREADS 391 self->handle = INVALID_HANDLE_VALUE; 392 } 393 394 Py_RETURN_NONE; 395 } 396 397 static PyObject * 398 connection_repr(ConnectionObject *self) 399 { 400 static char *conn_type[] = {"read-only", "write-only", "read-write"}; 401 402 assert(self->flags >= 1 && self->flags <= 3); 403 return FROM_FORMAT("<%s %s, handle %zd>", 404 conn_type[self->flags - 1], 405 CONNECTION_NAME, (Py_ssize_t)self->handle); 406 } 407 408 /* 409 * Getters and setters 410 */ 411 412 static PyObject * 413 connection_closed(ConnectionObject *self, void *closure) 414 { 415 return PyBool_FromLong((long)(self->handle == INVALID_HANDLE_VALUE)); 416 } 417 418 static PyObject * 419 connection_readable(ConnectionObject *self, void *closure) 420 { 421 return PyBool_FromLong((long)(self->flags & READABLE)); 422 } 423 424 static PyObject * 425 connection_writable(ConnectionObject *self, void *closure) 426 { 427 return PyBool_FromLong((long)(self->flags & WRITABLE)); 428 } 429 430 /* 431 * Tables 432 */ 433 434 static PyMethodDef connection_methods[] = { 435 {"send_bytes", (PyCFunction)connection_sendbytes, METH_VARARGS, 436 "send the byte data from a readable buffer-like object"}, 437 {"recv_bytes", (PyCFunction)connection_recvbytes, METH_VARARGS, 438 "receive byte data as a string"}, 439 {"recv_bytes_into",(PyCFunction)connection_recvbytes_into,METH_VARARGS, 440 "receive byte data into a writeable buffer-like object\n" 441 "returns the number of bytes read"}, 442 443 {"send", (PyCFunction)connection_send_obj, METH_O, 444 "send a (picklable) object"}, 445 {"recv", (PyCFunction)connection_recv_obj, METH_NOARGS, 446 "receive a (picklable) object"}, 447 448 {"poll", (PyCFunction)connection_poll, METH_VARARGS, 449 "whether there is any input available to be read"}, 450 {"fileno", (PyCFunction)connection_fileno, METH_NOARGS, 451 "file descriptor or handle of the connection"}, 452 {"close", (PyCFunction)connection_close, METH_NOARGS, 453 "close the connection"}, 454 455 {NULL} /* Sentinel */ 456 }; 457 458 static PyGetSetDef connection_getset[] = { 459 {"closed", (getter)connection_closed, NULL, 460 "True if the connection is closed", NULL}, 461 {"readable", (getter)connection_readable, NULL, 462 "True if the connection is readable", NULL}, 463 {"writable", (getter)connection_writable, NULL, 464 "True if the connection is writable", NULL}, 465 {NULL} 466 }; 467 468 /* 469 * Connection type 470 */ 471 472 PyDoc_STRVAR(connection_doc, 473 "Connection type whose constructor signature is\n\n" 474 " Connection(handle, readable=True, writable=True).\n\n" 475 "The constructor does *not* duplicate the handle."); 476 477 PyTypeObject CONNECTION_TYPE = { 478 PyVarObject_HEAD_INIT(NULL, 0) 479 /* tp_name */ "_multiprocessing." CONNECTION_NAME, 480 /* tp_basicsize */ sizeof(ConnectionObject), 481 /* tp_itemsize */ 0, 482 /* tp_dealloc */ (destructor)connection_dealloc, 483 /* tp_print */ 0, 484 /* tp_getattr */ 0, 485 /* tp_setattr */ 0, 486 /* tp_compare */ 0, 487 /* tp_repr */ (reprfunc)connection_repr, 488 /* tp_as_number */ 0, 489 /* tp_as_sequence */ 0, 490 /* tp_as_mapping */ 0, 491 /* tp_hash */ 0, 492 /* tp_call */ 0, 493 /* tp_str */ 0, 494 /* tp_getattro */ 0, 495 /* tp_setattro */ 0, 496 /* tp_as_buffer */ 0, 497 /* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | 498 Py_TPFLAGS_HAVE_WEAKREFS, 499 /* tp_doc */ connection_doc, 500 /* tp_traverse */ 0, 501 /* tp_clear */ 0, 502 /* tp_richcompare */ 0, 503 /* tp_weaklistoffset */ offsetof(ConnectionObject, weakreflist), 504 /* tp_iter */ 0, 505 /* tp_iternext */ 0, 506 /* tp_methods */ connection_methods, 507 /* tp_members */ 0, 508 /* tp_getset */ connection_getset, 509 /* tp_base */ 0, 510 /* tp_dict */ 0, 511 /* tp_descr_get */ 0, 512 /* tp_descr_set */ 0, 513 /* tp_dictoffset */ 0, 514 /* tp_init */ 0, 515 /* tp_alloc */ 0, 516 /* tp_new */ connection_new, 517 }; 518 519 #endif /* CONNECTION_H */ 520