Home | History | Annotate | Download | only in core
      1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/python/lib/core/py_func.h"
     17 
     18 #include <array>
     19 
     20 #include "numpy/arrayobject.h"
     21 #include "tensorflow/c/eager/c_api.h"
     22 #include "tensorflow/c/eager/c_api_internal.h"
     23 #include "tensorflow/c/tf_status_helper.h"
     24 #include "tensorflow/core/framework/allocation_description.pb.h"
     25 #include "tensorflow/core/framework/op_kernel.h"
     26 #include "tensorflow/core/lib/core/errors.h"
     27 #include "tensorflow/core/lib/core/threadpool.h"
     28 #include "tensorflow/core/platform/macros.h"
     29 #include "tensorflow/core/platform/mutex.h"
     30 #include "tensorflow/core/platform/types.h"
     31 #include "tensorflow/python/eager/pywrap_tfe.h"
     32 #include "tensorflow/python/lib/core/ndarray_tensor_bridge.h"
     33 #include "tensorflow/python/lib/core/py_util.h"
     34 #include "tensorflow/python/lib/core/safe_ptr.h"
     35 
     36 #include <Python.h>
     37 
     38 namespace tensorflow {
     39 namespace {
     40 
     41 static mutex mu(LINKER_INITIALIZED);
     42 static PyObject* py_trampoline GUARDED_BY(mu) = nullptr;
     43 
     44 // Returns the py_trampoline that is used to pass the control to the
     45 // python runtime.
     46 PyObject* GetPyTrampoline() {
     47   mutex_lock l(mu);
     48   return py_trampoline;
     49 }
     50 
     51 // A call to the registered python function.
     52 struct PyCall {
     53   // Passed to python runtime to call the python function registered
     54   // with this "token".
     55   string token;
     56 
     57   // The device on which Tensors are stored; only used for EagerPyFunc.
     58   Device* device;
     59 
     60   // True if and only if the op has been placed on a GPU.
     61   bool gpu;
     62 
     63   // True if the call is associated with an EagerPyFunc.
     64   bool eager;
     65 
     66   // Inputs and outputs of this function invocation.
     67   std::vector<Tensor> ins;
     68   std::vector<Tensor> out;
     69 };
     70 
     71 // Givens the 'call', prepares the token and inputs as a python tuple
     72 // that is appropriate for calling the trampoline.
     73 Status MakeArgTuple(const PyCall* call, PyObject** tuple) {
     74   int64 n = call->ins.size();
     75   PyObject* lst = PyList_New(n);
     76   CHECK(lst);
     77   for (int64 i = 0; i < n; ++i) {
     78     PyObject* arg = nullptr;
     79     const Tensor& t = call->ins[i];
     80     if (call->eager) {
     81       if (call->gpu) {
     82         arg = EagerTensorFromHandle(new TFE_TensorHandle(t, call->device));
     83       } else {
     84         // TFE_TensorHandle assumes that CPU is identified by `nullptr`.
     85         arg = EagerTensorFromHandle(new TFE_TensorHandle(t, nullptr));
     86       }
     87       if (arg == nullptr) {
     88         return errors::Internal("Unable to procure EagerTensor from Tensor.");
     89       }
     90     } else {
     91       Status s = ConvertTensorToNdarray(t, &arg);
     92       if (!s.ok()) {
     93         Py_DECREF(lst);
     94         return s;
     95       }
     96     }
     97     PyList_SetItem(lst, i, arg);
     98   }
     99   *tuple = Py_BuildValue("(sON)", call->token.c_str(),
    100                          call->gpu ? Py_True : Py_False, lst);
    101   CHECK(*tuple);
    102   return Status::OK();
    103 }
    104 
    105 // Returns the corresponding tf dtype in 'tf' for numpy data type
    106 // 'np'.  Returns an error if the type is not supported by this
    107 // module.
    108 Status NumericNpDTypeToTfDType(const int np, DataType* tf) {
    109   switch (np) {
    110     case NPY_FLOAT16:
    111       *tf = DT_HALF;
    112       break;
    113     case NPY_FLOAT32:
    114       *tf = DT_FLOAT;
    115       break;
    116     case NPY_FLOAT64:
    117       *tf = DT_DOUBLE;
    118       break;
    119     case NPY_INT32:
    120       *tf = DT_INT32;
    121       break;
    122     case NPY_UINT8:
    123       *tf = DT_UINT8;
    124       break;
    125     case NPY_INT8:
    126       *tf = DT_INT8;
    127       break;
    128     case NPY_INT16:
    129       *tf = DT_INT16;
    130       break;
    131     case NPY_INT64:
    132       *tf = DT_INT64;
    133       break;
    134     case NPY_BOOL:
    135       *tf = DT_BOOL;
    136       break;
    137     case NPY_COMPLEX64:
    138       *tf = DT_COMPLEX64;
    139       break;
    140     case NPY_COMPLEX128:
    141       *tf = DT_COMPLEX128;
    142       break;
    143     default:
    144       return errors::Unimplemented("Unsupported numpy type ", np);
    145   }
    146   return Status::OK();
    147 }
    148 
    149 bool IsSingleNone(PyObject* obj) {
    150   if (!PyArray_Check(obj)) {
    151     return false;
    152   }
    153   PyArrayObject* array_obj = reinterpret_cast<PyArrayObject*>(obj);
    154   if (PyArray_NDIM(array_obj) != 0 || PyArray_SIZE(array_obj) != 1) {
    155     return false;
    156   }
    157   std::array<npy_intp, 0> indices;
    158   char* item_ptr =
    159       static_cast<char*>(PyArray_GetPtr(array_obj, indices.data()));
    160   PyObject* item = PyArray_GETITEM(array_obj, item_ptr);
    161   CHECK(item);
    162   return item == Py_None;
    163 }
    164 
    165 // Retrieves a Tensor from `eager_tensor` and stores it in `output_tensor`.
    166 void ExtractTensorFromEagerTensor(const PyObject* eager_tensor,
    167                                   Tensor* output_tensor) {
    168   *output_tensor = EagerTensor_Handle(eager_tensor)->t;
    169 }
    170 
    171 // Calls the registered py function through the trampoline.
    172 Status DoCallPyFunc(PyCall* call, bool* out_log_on_error) {
    173   *out_log_on_error = true;
    174   PyObject* trampoline = GetPyTrampoline();
    175   if (trampoline == nullptr) {
    176     return errors::InvalidArgument(
    177         "Missing py trampoline. Most likely, it is a link error.");
    178   }
    179   // Prepare the argument.
    180   PyObject* args = nullptr;
    181   TF_RETURN_IF_ERROR(MakeArgTuple(call, &args));
    182   CHECK(args);
    183 
    184   // Invokes the trampoline.
    185   PyObject* result = PyEval_CallObject(trampoline, args);
    186   Py_DECREF(args);
    187   if (result == nullptr) {
    188     if (PyErr_Occurred()) {
    189       if (PyErr_ExceptionMatches(PyExc_ValueError) ||
    190           PyErr_ExceptionMatches(PyExc_TypeError)) {
    191         return errors::InvalidArgument(PyExceptionFetch());
    192       } else if (PyErr_ExceptionMatches(PyExc_StopIteration)) {
    193         *out_log_on_error = false;
    194         return errors::OutOfRange(PyExceptionFetch());
    195       } else if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
    196         return errors::ResourceExhausted(PyExceptionFetch());
    197       } else if (PyErr_ExceptionMatches(PyExc_NotImplementedError)) {
    198         return errors::Unimplemented(PyExceptionFetch());
    199       } else {
    200         // TODO(ebrevdo): Check if exception is an OpError and use the
    201         // OpError.error_code property to map it back in the Status.
    202         return errors::Unknown(PyExceptionFetch());
    203       }
    204     } else {
    205       return errors::Internal("Failed to run py callback ", call->token,
    206                               ": see error log.");
    207     }
    208   }
    209 
    210   // Process the return values and convert them to TF Tensors.
    211   Status s = Status::OK();
    212   if (PyList_Check(result)) {
    213     // `result` is a Python list; if this operation is an `EagerPyFunc`, then
    214     // every item in the list must be an `EagerTensor`; otherwise, every element
    215     // must be a NumPy array.
    216     call->out.clear();
    217     for (int i = 0; i < PyList_Size(result); ++i) {
    218       Tensor t;
    219       if (call->eager) {
    220         const PyObject* item = PyList_GetItem(result, i);
    221         if (EagerTensor_CheckExact(item)) {
    222           ExtractTensorFromEagerTensor(item, &t);
    223         } else {
    224           s = errors::FailedPrecondition(
    225               "Expected EagerTensor, found PyObject of type: ",
    226               Py_TYPE(item)->tp_name);
    227         }
    228       } else {
    229         s = ConvertNdarrayToTensor(PyList_GetItem(result, i), &t);
    230       }
    231 
    232       if (!s.ok()) {
    233         break;
    234       }
    235       call->out.push_back(t);
    236     }
    237   } else if (EagerTensor_CheckExact(result) || result == Py_None) {
    238     // result is an `EagerTensor` or `None`.
    239     DCHECK(call->eager);
    240     Tensor t;
    241     if (result != Py_None) {
    242       ExtractTensorFromEagerTensor(result, &t);
    243       call->out.push_back(t);
    244     }
    245   } else if (PyArray_Check(result)) {
    246     // `result` is a NumPy array.
    247     DCHECK(!call->eager);
    248     if (!IsSingleNone(result)) {
    249       Tensor t;
    250       s = ConvertNdarrayToTensor(result, &t);
    251       if (s.ok()) {
    252         call->out.push_back(t);
    253       }
    254     }
    255   } else {
    256     s = errors::Internal("Unexpected PyObject was returned: ",
    257                          Py_TYPE(result)->tp_name);
    258   }
    259   Py_DECREF(result);
    260   return s;
    261 }
    262 
    263 }  // end namespace
    264 
    265 // Outside anonymous namespace just to make the friend declaration in
    266 // tensorflow::Tensor apply.
    267 class NumpyTensorBuffer : public TensorBuffer {
    268  public:
    269   NumpyTensorBuffer(PyArrayObject* array, size_t len, void* data)
    270       : array_(array), len_(len), data_(data) {}
    271 
    272   ~NumpyTensorBuffer() override {
    273     // Note: The session::run wrapper is responsible for freeing this while
    274     // holding the GIL.
    275     DelayedNumpyDecref(data_, len_, array_);
    276   }
    277 
    278   void* data() const override { return data_; }
    279   size_t size() const override { return len_; }
    280   TensorBuffer* root_buffer() override { return this; }
    281   void FillAllocationDescription(AllocationDescription* proto) const override {
    282     tensorflow::int64 rb = size();
    283     proto->set_requested_bytes(rb);
    284     proto->set_allocator_name(tensorflow::cpu_allocator()->Name());
    285   }
    286   Tensor MakeTensor(DataType dtype, const TensorShape& shape) {
    287     CHECK_EQ(len_, shape.num_elements() * DataTypeSize(dtype));
    288     return Tensor(dtype, shape, this);
    289   }
    290 
    291   // Prevents input forwarding from overwriting this buffer.
    292   bool OwnsMemory() const override { return false; }
    293 
    294  private:
    295   PyArrayObject* array_;
    296   size_t len_;
    297   void* data_;
    298 };
    299 
    300 Status ConvertNdarrayToTensor(PyObject* obj, Tensor* ret) {
    301   PyArrayObject* input = reinterpret_cast<PyArrayObject*>(obj);
    302   DataType dtype = DT_INVALID;
    303   TensorShape shape;
    304   for (int i = 0; i < PyArray_NDIM(input); ++i) {
    305     shape.AddDim(PyArray_SHAPE(input)[i]);
    306   }
    307   const int np_type = PyArray_TYPE(input);
    308   switch (np_type) {
    309     case NPY_OBJECT: {
    310       dtype = DT_STRING;
    311       Tensor t(dtype, shape);
    312       auto tflat = t.flat<string>();
    313       PyObject** input_data = reinterpret_cast<PyObject**>(PyArray_DATA(input));
    314       for (int i = 0; i < tflat.dimension(0); ++i) {
    315         char* el;
    316         Py_ssize_t el_size;
    317         if (PyBytes_AsStringAndSize(input_data[i], &el, &el_size) == -1) {
    318 #if PY_MAJOR_VERSION >= 3
    319           el = PyUnicode_AsUTF8AndSize(input_data[i], &el_size);
    320 #else
    321           el = nullptr;
    322           if (PyUnicode_Check(input_data[i])) {
    323             PyObject* unicode = PyUnicode_AsUTF8String(input_data[i]);
    324             if (unicode) {
    325               if (PyString_AsStringAndSize(unicode, &el, &el_size) == -1) {
    326                 Py_DECREF(unicode);
    327                 el = nullptr;
    328               }
    329             }
    330           }
    331 #endif
    332           if (!el) {
    333             return errors::Unimplemented("Unsupported object type ",
    334                                          input_data[i]->ob_type->tp_name);
    335           }
    336         }
    337         tflat(i) = string(el, el_size);
    338       }
    339       *ret = t;
    340       break;
    341     }
    342     case NPY_STRING: {
    343       dtype = DT_STRING;
    344       Tensor t(dtype, shape);
    345       auto tflat = t.flat<string>();
    346       char* input_data = PyArray_BYTES(input);
    347       Py_ssize_t el_size = PyArray_ITEMSIZE(input);
    348       for (int i = 0; i < tflat.dimension(0); ++i) {
    349         tflat(i) = string(input_data + i * el_size, el_size);
    350       }
    351       *ret = t;
    352       break;
    353     }
    354     default: {
    355       TF_RETURN_IF_ERROR(NumericNpDTypeToTfDType(PyArray_TYPE(input), &dtype));
    356       CHECK(DataTypeCanUseMemcpy(dtype));
    357       if (reinterpret_cast<intptr_t>(PyArray_DATA(input)) %
    358               EIGEN_MAX_ALIGN_BYTES !=
    359           0) {
    360         Tensor t(dtype, shape);
    361         StringPiece p = t.tensor_data();
    362         memcpy(const_cast<char*>(p.data()), PyArray_DATA(input), p.size());
    363         *ret = t;
    364       } else {
    365         // Incref the array as the calling context will decref it when we
    366         // return and we want to keep a handle to this memory.
    367         Py_INCREF(input);
    368         NumpyTensorBuffer* buf = new NumpyTensorBuffer(
    369             input, shape.num_elements() * DataTypeSize(dtype),
    370             PyArray_DATA(input));
    371         *ret = buf->MakeTensor(dtype, shape);
    372         buf->Unref();
    373       }
    374     }
    375   }
    376   return Status::OK();
    377 }
    378 
    379 // Creates a numpy array in 'ret' which either aliases the content of 't' or has
    380 // a copy.
    381 Status ConvertTensorToNdarray(const Tensor& t, PyObject** ret) {
    382   int typenum = -1;
    383   TF_RETURN_IF_ERROR(TF_DataType_to_PyArray_TYPE(
    384       static_cast<TF_DataType>(t.dtype()), &typenum));
    385   PyArray_Descr* descr = PyArray_DescrFromType(typenum);
    386   CHECK(descr);
    387   std::vector<npy_intp> dims;
    388   dims.reserve(t.dims());
    389   for (int i = 0; i < t.dims(); ++i) {
    390     dims.push_back(t.dim_size(i));
    391   }
    392   Tensor* copy = new Tensor(t);
    393   if (ArrayFromMemory(dims.size(), dims.data(),
    394                       const_cast<char*>(copy->tensor_data().data()), t.dtype(),
    395                       [copy]() { delete copy; }, ret)
    396           .ok()) {
    397     return Status::OK();
    398   }
    399   delete copy;
    400 
    401   PyObject* obj = PyArray_Empty(dims.size(), dims.data(), descr, 0);
    402   if (obj == nullptr) {
    403     return errors::Internal("Failed to allocate np array: ",
    404                             t.shape().DebugString());
    405   }
    406   PyArrayObject* np_array = reinterpret_cast<PyArrayObject*>(obj);
    407   if (typenum == NPY_OBJECT) {
    408     CHECK_EQ(DT_STRING, t.dtype());
    409     auto tflat = t.flat<string>();
    410     PyObject** out = reinterpret_cast<PyObject**>(PyArray_DATA(np_array));
    411     for (int i = 0; i < tflat.dimension(0); ++i) {
    412       const string& el = tflat(i);
    413       out[i] = PyBytes_FromStringAndSize(el.data(), el.size());
    414       if (out[i] == nullptr) {
    415         for (int j = 0; j < i; ++j) {
    416           Py_DECREF(out[j]);
    417         }
    418         Py_DECREF(obj);
    419         return errors::Internal("Failed to allocate a copy of string ", i);
    420       }
    421     }
    422   } else {
    423     CHECK(DataTypeCanUseMemcpy(t.dtype()));
    424     StringPiece p = t.tensor_data();
    425     memcpy(PyArray_DATA(np_array), p.data(), p.size());
    426   }
    427   *ret = PyArray_Return(np_array);
    428   return Status::OK();
    429 }
    430 
    431 void InitializePyTrampoline(PyObject* trampoline) {
    432   mutex_lock l(mu);
    433   if (py_trampoline == nullptr) {
    434     py_trampoline = trampoline;
    435     Py_INCREF(py_trampoline);
    436   } else {
    437     LOG(WARNING) << "InitializeCallback should only be called once";
    438   }
    439 }
    440 
    441 class PyFuncOp : public OpKernel {
    442  public:
    443   explicit PyFuncOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
    444     OP_REQUIRES_OK(ctx, ctx->GetAttr("token", &token_));
    445     eager_ = type_string() == "EagerPyFunc";
    446     gpu_ = ctx->device_type().type_string() == DEVICE_GPU;
    447   }
    448 
    449   void Compute(OpKernelContext* ctx) override {
    450     PyCall call;
    451     call.token = token_;
    452     call.gpu = gpu_;
    453     call.eager = eager_;
    454     if (call.eager) {
    455       // Eager's C API uses `Device`, whereas `OpKernelContext` stores a
    456       // `DeviceBase`; attempt to downcast.
    457       call.device = dynamic_cast<Device*>(ctx->device());
    458       if (call.device == nullptr) {
    459         ctx->CtxFailureWithWarning(
    460             errors::Internal("Unrecognized device class"));
    461       }
    462     }
    463 
    464     for (int i = 0; i < ctx->num_inputs(); ++i) {
    465       call.ins.push_back(ctx->input(i));
    466     }
    467 
    468     PyGILState_STATE py_threadstate;
    469     py_threadstate = PyGILState_Ensure();
    470     bool log_on_error;
    471     Status s = DoCallPyFunc(&call, &log_on_error);
    472     // Sometimes py_funcs can be called without a session and leak memory. This
    473     // ensures we clear the decref cache so this doesn't happen.
    474     ClearDecrefCache();
    475     PyGILState_Release(py_threadstate);
    476 
    477     // Ensures that GIL is released even when !s.ok().
    478     if (!s.ok()) {
    479       if (log_on_error) {
    480         ctx->CtxFailureWithWarning(s);
    481       } else {
    482         ctx->CtxFailure(s);
    483       }
    484       return;
    485     }
    486 
    487     OP_REQUIRES(ctx, static_cast<int32>(call.out.size()) == ctx->num_outputs(),
    488                 errors::InvalidArgument(token_, " returns ", call.out.size(),
    489                                         " values, but expects to see ",
    490                                         ctx->num_outputs(), " values."));
    491     for (size_t i = 0; i < call.out.size(); ++i) {
    492       const auto& t = call.out[i];
    493       OP_REQUIRES(
    494           ctx, t.dtype() == output_type(i),
    495           errors::InvalidArgument(i, "-th value returned by ", token_, " is ",
    496                                   DataTypeString(t.dtype()), ", but expects ",
    497                                   DataTypeString(output_type(i))));
    498       ctx->set_output(i, t);
    499     }
    500   }
    501 
    502  private:
    503   string token_;
    504 
    505   // True if and only if this op has been placed on a GPU.
    506   bool gpu_;
    507 
    508   // True if and only if this op should execute the python function eagerly,
    509   // i.e., if and only if the eager attribute is set.
    510   bool eager_;
    511 
    512   TF_DISALLOW_COPY_AND_ASSIGN(PyFuncOp);
    513 };
    514 
    515 REGISTER_KERNEL_BUILDER(Name("PyFunc").Device(DEVICE_CPU), PyFuncOp);
    516 REGISTER_KERNEL_BUILDER(Name("PyFuncStateless").Device(DEVICE_CPU), PyFuncOp);
    517 REGISTER_KERNEL_BUILDER(Name("EagerPyFunc").Device(DEVICE_CPU), PyFuncOp);
    518 REGISTER_KERNEL_BUILDER(Name("EagerPyFunc").Device(DEVICE_GPU), PyFuncOp);
    519 
    520 }  // end namespace tensorflow
    521