1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/python/lib/core/py_func.h" 17 18 #include <array> 19 20 #include "numpy/arrayobject.h" 21 #include "tensorflow/c/eager/c_api.h" 22 #include "tensorflow/c/eager/c_api_internal.h" 23 #include "tensorflow/c/tf_status_helper.h" 24 #include "tensorflow/core/framework/allocation_description.pb.h" 25 #include "tensorflow/core/framework/op_kernel.h" 26 #include "tensorflow/core/lib/core/errors.h" 27 #include "tensorflow/core/lib/core/threadpool.h" 28 #include "tensorflow/core/platform/macros.h" 29 #include "tensorflow/core/platform/mutex.h" 30 #include "tensorflow/core/platform/types.h" 31 #include "tensorflow/python/eager/pywrap_tfe.h" 32 #include "tensorflow/python/lib/core/ndarray_tensor_bridge.h" 33 #include "tensorflow/python/lib/core/py_util.h" 34 #include "tensorflow/python/lib/core/safe_ptr.h" 35 36 #include <Python.h> 37 38 namespace tensorflow { 39 namespace { 40 41 static mutex mu(LINKER_INITIALIZED); 42 static PyObject* py_trampoline GUARDED_BY(mu) = nullptr; 43 44 // Returns the py_trampoline that is used to pass the control to the 45 // python runtime. 46 PyObject* GetPyTrampoline() { 47 mutex_lock l(mu); 48 return py_trampoline; 49 } 50 51 // A call to the registered python function. 52 struct PyCall { 53 // Passed to python runtime to call the python function registered 54 // with this "token". 55 string token; 56 57 // The device on which Tensors are stored; only used for EagerPyFunc. 58 Device* device; 59 60 // True if and only if the op has been placed on a GPU. 61 bool gpu; 62 63 // True if the call is associated with an EagerPyFunc. 64 bool eager; 65 66 // Inputs and outputs of this function invocation. 67 std::vector<Tensor> ins; 68 std::vector<Tensor> out; 69 }; 70 71 // Givens the 'call', prepares the token and inputs as a python tuple 72 // that is appropriate for calling the trampoline. 73 Status MakeArgTuple(const PyCall* call, PyObject** tuple) { 74 int64 n = call->ins.size(); 75 PyObject* lst = PyList_New(n); 76 CHECK(lst); 77 for (int64 i = 0; i < n; ++i) { 78 PyObject* arg = nullptr; 79 const Tensor& t = call->ins[i]; 80 if (call->eager) { 81 if (call->gpu) { 82 arg = EagerTensorFromHandle(new TFE_TensorHandle(t, call->device)); 83 } else { 84 // TFE_TensorHandle assumes that CPU is identified by `nullptr`. 85 arg = EagerTensorFromHandle(new TFE_TensorHandle(t, nullptr)); 86 } 87 if (arg == nullptr) { 88 return errors::Internal("Unable to procure EagerTensor from Tensor."); 89 } 90 } else { 91 Status s = ConvertTensorToNdarray(t, &arg); 92 if (!s.ok()) { 93 Py_DECREF(lst); 94 return s; 95 } 96 } 97 PyList_SetItem(lst, i, arg); 98 } 99 *tuple = Py_BuildValue("(sON)", call->token.c_str(), 100 call->gpu ? Py_True : Py_False, lst); 101 CHECK(*tuple); 102 return Status::OK(); 103 } 104 105 // Returns the corresponding tf dtype in 'tf' for numpy data type 106 // 'np'. Returns an error if the type is not supported by this 107 // module. 108 Status NumericNpDTypeToTfDType(const int np, DataType* tf) { 109 switch (np) { 110 case NPY_FLOAT16: 111 *tf = DT_HALF; 112 break; 113 case NPY_FLOAT32: 114 *tf = DT_FLOAT; 115 break; 116 case NPY_FLOAT64: 117 *tf = DT_DOUBLE; 118 break; 119 case NPY_INT32: 120 *tf = DT_INT32; 121 break; 122 case NPY_UINT8: 123 *tf = DT_UINT8; 124 break; 125 case NPY_INT8: 126 *tf = DT_INT8; 127 break; 128 case NPY_INT16: 129 *tf = DT_INT16; 130 break; 131 case NPY_INT64: 132 *tf = DT_INT64; 133 break; 134 case NPY_BOOL: 135 *tf = DT_BOOL; 136 break; 137 case NPY_COMPLEX64: 138 *tf = DT_COMPLEX64; 139 break; 140 case NPY_COMPLEX128: 141 *tf = DT_COMPLEX128; 142 break; 143 default: 144 return errors::Unimplemented("Unsupported numpy type ", np); 145 } 146 return Status::OK(); 147 } 148 149 bool IsSingleNone(PyObject* obj) { 150 if (!PyArray_Check(obj)) { 151 return false; 152 } 153 PyArrayObject* array_obj = reinterpret_cast<PyArrayObject*>(obj); 154 if (PyArray_NDIM(array_obj) != 0 || PyArray_SIZE(array_obj) != 1) { 155 return false; 156 } 157 std::array<npy_intp, 0> indices; 158 char* item_ptr = 159 static_cast<char*>(PyArray_GetPtr(array_obj, indices.data())); 160 PyObject* item = PyArray_GETITEM(array_obj, item_ptr); 161 CHECK(item); 162 return item == Py_None; 163 } 164 165 // Retrieves a Tensor from `eager_tensor` and stores it in `output_tensor`. 166 void ExtractTensorFromEagerTensor(const PyObject* eager_tensor, 167 Tensor* output_tensor) { 168 *output_tensor = EagerTensor_Handle(eager_tensor)->t; 169 } 170 171 // Calls the registered py function through the trampoline. 172 Status DoCallPyFunc(PyCall* call, bool* out_log_on_error) { 173 *out_log_on_error = true; 174 PyObject* trampoline = GetPyTrampoline(); 175 if (trampoline == nullptr) { 176 return errors::InvalidArgument( 177 "Missing py trampoline. Most likely, it is a link error."); 178 } 179 // Prepare the argument. 180 PyObject* args = nullptr; 181 TF_RETURN_IF_ERROR(MakeArgTuple(call, &args)); 182 CHECK(args); 183 184 // Invokes the trampoline. 185 PyObject* result = PyEval_CallObject(trampoline, args); 186 Py_DECREF(args); 187 if (result == nullptr) { 188 if (PyErr_Occurred()) { 189 if (PyErr_ExceptionMatches(PyExc_ValueError) || 190 PyErr_ExceptionMatches(PyExc_TypeError)) { 191 return errors::InvalidArgument(PyExceptionFetch()); 192 } else if (PyErr_ExceptionMatches(PyExc_StopIteration)) { 193 *out_log_on_error = false; 194 return errors::OutOfRange(PyExceptionFetch()); 195 } else if (PyErr_ExceptionMatches(PyExc_MemoryError)) { 196 return errors::ResourceExhausted(PyExceptionFetch()); 197 } else if (PyErr_ExceptionMatches(PyExc_NotImplementedError)) { 198 return errors::Unimplemented(PyExceptionFetch()); 199 } else { 200 // TODO(ebrevdo): Check if exception is an OpError and use the 201 // OpError.error_code property to map it back in the Status. 202 return errors::Unknown(PyExceptionFetch()); 203 } 204 } else { 205 return errors::Internal("Failed to run py callback ", call->token, 206 ": see error log."); 207 } 208 } 209 210 // Process the return values and convert them to TF Tensors. 211 Status s = Status::OK(); 212 if (PyList_Check(result)) { 213 // `result` is a Python list; if this operation is an `EagerPyFunc`, then 214 // every item in the list must be an `EagerTensor`; otherwise, every element 215 // must be a NumPy array. 216 call->out.clear(); 217 for (int i = 0; i < PyList_Size(result); ++i) { 218 Tensor t; 219 if (call->eager) { 220 const PyObject* item = PyList_GetItem(result, i); 221 if (EagerTensor_CheckExact(item)) { 222 ExtractTensorFromEagerTensor(item, &t); 223 } else { 224 s = errors::FailedPrecondition( 225 "Expected EagerTensor, found PyObject of type: ", 226 Py_TYPE(item)->tp_name); 227 } 228 } else { 229 s = ConvertNdarrayToTensor(PyList_GetItem(result, i), &t); 230 } 231 232 if (!s.ok()) { 233 break; 234 } 235 call->out.push_back(t); 236 } 237 } else if (EagerTensor_CheckExact(result) || result == Py_None) { 238 // result is an `EagerTensor` or `None`. 239 DCHECK(call->eager); 240 Tensor t; 241 if (result != Py_None) { 242 ExtractTensorFromEagerTensor(result, &t); 243 call->out.push_back(t); 244 } 245 } else if (PyArray_Check(result)) { 246 // `result` is a NumPy array. 247 DCHECK(!call->eager); 248 if (!IsSingleNone(result)) { 249 Tensor t; 250 s = ConvertNdarrayToTensor(result, &t); 251 if (s.ok()) { 252 call->out.push_back(t); 253 } 254 } 255 } else { 256 s = errors::Internal("Unexpected PyObject was returned: ", 257 Py_TYPE(result)->tp_name); 258 } 259 Py_DECREF(result); 260 return s; 261 } 262 263 } // end namespace 264 265 // Outside anonymous namespace just to make the friend declaration in 266 // tensorflow::Tensor apply. 267 class NumpyTensorBuffer : public TensorBuffer { 268 public: 269 NumpyTensorBuffer(PyArrayObject* array, size_t len, void* data) 270 : array_(array), len_(len), data_(data) {} 271 272 ~NumpyTensorBuffer() override { 273 // Note: The session::run wrapper is responsible for freeing this while 274 // holding the GIL. 275 DelayedNumpyDecref(data_, len_, array_); 276 } 277 278 void* data() const override { return data_; } 279 size_t size() const override { return len_; } 280 TensorBuffer* root_buffer() override { return this; } 281 void FillAllocationDescription(AllocationDescription* proto) const override { 282 tensorflow::int64 rb = size(); 283 proto->set_requested_bytes(rb); 284 proto->set_allocator_name(tensorflow::cpu_allocator()->Name()); 285 } 286 Tensor MakeTensor(DataType dtype, const TensorShape& shape) { 287 CHECK_EQ(len_, shape.num_elements() * DataTypeSize(dtype)); 288 return Tensor(dtype, shape, this); 289 } 290 291 // Prevents input forwarding from overwriting this buffer. 292 bool OwnsMemory() const override { return false; } 293 294 private: 295 PyArrayObject* array_; 296 size_t len_; 297 void* data_; 298 }; 299 300 Status ConvertNdarrayToTensor(PyObject* obj, Tensor* ret) { 301 PyArrayObject* input = reinterpret_cast<PyArrayObject*>(obj); 302 DataType dtype = DT_INVALID; 303 TensorShape shape; 304 for (int i = 0; i < PyArray_NDIM(input); ++i) { 305 shape.AddDim(PyArray_SHAPE(input)[i]); 306 } 307 const int np_type = PyArray_TYPE(input); 308 switch (np_type) { 309 case NPY_OBJECT: { 310 dtype = DT_STRING; 311 Tensor t(dtype, shape); 312 auto tflat = t.flat<string>(); 313 PyObject** input_data = reinterpret_cast<PyObject**>(PyArray_DATA(input)); 314 for (int i = 0; i < tflat.dimension(0); ++i) { 315 char* el; 316 Py_ssize_t el_size; 317 if (PyBytes_AsStringAndSize(input_data[i], &el, &el_size) == -1) { 318 #if PY_MAJOR_VERSION >= 3 319 el = PyUnicode_AsUTF8AndSize(input_data[i], &el_size); 320 #else 321 el = nullptr; 322 if (PyUnicode_Check(input_data[i])) { 323 PyObject* unicode = PyUnicode_AsUTF8String(input_data[i]); 324 if (unicode) { 325 if (PyString_AsStringAndSize(unicode, &el, &el_size) == -1) { 326 Py_DECREF(unicode); 327 el = nullptr; 328 } 329 } 330 } 331 #endif 332 if (!el) { 333 return errors::Unimplemented("Unsupported object type ", 334 input_data[i]->ob_type->tp_name); 335 } 336 } 337 tflat(i) = string(el, el_size); 338 } 339 *ret = t; 340 break; 341 } 342 case NPY_STRING: { 343 dtype = DT_STRING; 344 Tensor t(dtype, shape); 345 auto tflat = t.flat<string>(); 346 char* input_data = PyArray_BYTES(input); 347 Py_ssize_t el_size = PyArray_ITEMSIZE(input); 348 for (int i = 0; i < tflat.dimension(0); ++i) { 349 tflat(i) = string(input_data + i * el_size, el_size); 350 } 351 *ret = t; 352 break; 353 } 354 default: { 355 TF_RETURN_IF_ERROR(NumericNpDTypeToTfDType(PyArray_TYPE(input), &dtype)); 356 CHECK(DataTypeCanUseMemcpy(dtype)); 357 if (reinterpret_cast<intptr_t>(PyArray_DATA(input)) % 358 EIGEN_MAX_ALIGN_BYTES != 359 0) { 360 Tensor t(dtype, shape); 361 StringPiece p = t.tensor_data(); 362 memcpy(const_cast<char*>(p.data()), PyArray_DATA(input), p.size()); 363 *ret = t; 364 } else { 365 // Incref the array as the calling context will decref it when we 366 // return and we want to keep a handle to this memory. 367 Py_INCREF(input); 368 NumpyTensorBuffer* buf = new NumpyTensorBuffer( 369 input, shape.num_elements() * DataTypeSize(dtype), 370 PyArray_DATA(input)); 371 *ret = buf->MakeTensor(dtype, shape); 372 buf->Unref(); 373 } 374 } 375 } 376 return Status::OK(); 377 } 378 379 // Creates a numpy array in 'ret' which either aliases the content of 't' or has 380 // a copy. 381 Status ConvertTensorToNdarray(const Tensor& t, PyObject** ret) { 382 int typenum = -1; 383 TF_RETURN_IF_ERROR(TF_DataType_to_PyArray_TYPE( 384 static_cast<TF_DataType>(t.dtype()), &typenum)); 385 PyArray_Descr* descr = PyArray_DescrFromType(typenum); 386 CHECK(descr); 387 std::vector<npy_intp> dims; 388 dims.reserve(t.dims()); 389 for (int i = 0; i < t.dims(); ++i) { 390 dims.push_back(t.dim_size(i)); 391 } 392 Tensor* copy = new Tensor(t); 393 if (ArrayFromMemory(dims.size(), dims.data(), 394 const_cast<char*>(copy->tensor_data().data()), t.dtype(), 395 [copy]() { delete copy; }, ret) 396 .ok()) { 397 return Status::OK(); 398 } 399 delete copy; 400 401 PyObject* obj = PyArray_Empty(dims.size(), dims.data(), descr, 0); 402 if (obj == nullptr) { 403 return errors::Internal("Failed to allocate np array: ", 404 t.shape().DebugString()); 405 } 406 PyArrayObject* np_array = reinterpret_cast<PyArrayObject*>(obj); 407 if (typenum == NPY_OBJECT) { 408 CHECK_EQ(DT_STRING, t.dtype()); 409 auto tflat = t.flat<string>(); 410 PyObject** out = reinterpret_cast<PyObject**>(PyArray_DATA(np_array)); 411 for (int i = 0; i < tflat.dimension(0); ++i) { 412 const string& el = tflat(i); 413 out[i] = PyBytes_FromStringAndSize(el.data(), el.size()); 414 if (out[i] == nullptr) { 415 for (int j = 0; j < i; ++j) { 416 Py_DECREF(out[j]); 417 } 418 Py_DECREF(obj); 419 return errors::Internal("Failed to allocate a copy of string ", i); 420 } 421 } 422 } else { 423 CHECK(DataTypeCanUseMemcpy(t.dtype())); 424 StringPiece p = t.tensor_data(); 425 memcpy(PyArray_DATA(np_array), p.data(), p.size()); 426 } 427 *ret = PyArray_Return(np_array); 428 return Status::OK(); 429 } 430 431 void InitializePyTrampoline(PyObject* trampoline) { 432 mutex_lock l(mu); 433 if (py_trampoline == nullptr) { 434 py_trampoline = trampoline; 435 Py_INCREF(py_trampoline); 436 } else { 437 LOG(WARNING) << "InitializeCallback should only be called once"; 438 } 439 } 440 441 class PyFuncOp : public OpKernel { 442 public: 443 explicit PyFuncOp(OpKernelConstruction* ctx) : OpKernel(ctx) { 444 OP_REQUIRES_OK(ctx, ctx->GetAttr("token", &token_)); 445 eager_ = type_string() == "EagerPyFunc"; 446 gpu_ = ctx->device_type().type_string() == DEVICE_GPU; 447 } 448 449 void Compute(OpKernelContext* ctx) override { 450 PyCall call; 451 call.token = token_; 452 call.gpu = gpu_; 453 call.eager = eager_; 454 if (call.eager) { 455 // Eager's C API uses `Device`, whereas `OpKernelContext` stores a 456 // `DeviceBase`; attempt to downcast. 457 call.device = dynamic_cast<Device*>(ctx->device()); 458 if (call.device == nullptr) { 459 ctx->CtxFailureWithWarning( 460 errors::Internal("Unrecognized device class")); 461 } 462 } 463 464 for (int i = 0; i < ctx->num_inputs(); ++i) { 465 call.ins.push_back(ctx->input(i)); 466 } 467 468 PyGILState_STATE py_threadstate; 469 py_threadstate = PyGILState_Ensure(); 470 bool log_on_error; 471 Status s = DoCallPyFunc(&call, &log_on_error); 472 // Sometimes py_funcs can be called without a session and leak memory. This 473 // ensures we clear the decref cache so this doesn't happen. 474 ClearDecrefCache(); 475 PyGILState_Release(py_threadstate); 476 477 // Ensures that GIL is released even when !s.ok(). 478 if (!s.ok()) { 479 if (log_on_error) { 480 ctx->CtxFailureWithWarning(s); 481 } else { 482 ctx->CtxFailure(s); 483 } 484 return; 485 } 486 487 OP_REQUIRES(ctx, static_cast<int32>(call.out.size()) == ctx->num_outputs(), 488 errors::InvalidArgument(token_, " returns ", call.out.size(), 489 " values, but expects to see ", 490 ctx->num_outputs(), " values.")); 491 for (size_t i = 0; i < call.out.size(); ++i) { 492 const auto& t = call.out[i]; 493 OP_REQUIRES( 494 ctx, t.dtype() == output_type(i), 495 errors::InvalidArgument(i, "-th value returned by ", token_, " is ", 496 DataTypeString(t.dtype()), ", but expects ", 497 DataTypeString(output_type(i)))); 498 ctx->set_output(i, t); 499 } 500 } 501 502 private: 503 string token_; 504 505 // True if and only if this op has been placed on a GPU. 506 bool gpu_; 507 508 // True if and only if this op should execute the python function eagerly, 509 // i.e., if and only if the eager attribute is set. 510 bool eager_; 511 512 TF_DISALLOW_COPY_AND_ASSIGN(PyFuncOp); 513 }; 514 515 REGISTER_KERNEL_BUILDER(Name("PyFunc").Device(DEVICE_CPU), PyFuncOp); 516 REGISTER_KERNEL_BUILDER(Name("PyFuncStateless").Device(DEVICE_CPU), PyFuncOp); 517 REGISTER_KERNEL_BUILDER(Name("EagerPyFunc").Device(DEVICE_CPU), PyFuncOp); 518 REGISTER_KERNEL_BUILDER(Name("EagerPyFunc").Device(DEVICE_GPU), PyFuncOp); 519 520 } // end namespace tensorflow 521