1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define LOG_TAG "ExecutionBurstServer" 18 19 #include "ExecutionBurstServer.h" 20 21 #include <android-base/logging.h> 22 23 #include <cstring> 24 #include <limits> 25 #include <map> 26 27 #include "Tracing.h" 28 29 namespace android::nn { 30 namespace { 31 32 constexpr Timing kNoTiming = {std::numeric_limits<uint64_t>::max(), 33 std::numeric_limits<uint64_t>::max()}; 34 35 // DefaultBurstExecutorWithCache adapts an IPreparedModel so that it can be 36 // used as an IBurstExecutorWithCache. Specifically, the cache simply stores the 37 // hidl_memory object, and the execution forwards calls to the provided 38 // IPreparedModel's "executeSynchronously" method. With this class, hidl_memory 39 // must be mapped and unmapped for each execution. 40 class DefaultBurstExecutorWithCache : public ExecutionBurstServer::IBurstExecutorWithCache { 41 public: 42 DefaultBurstExecutorWithCache(IPreparedModel* preparedModel) : mpPreparedModel(preparedModel) {} 43 44 bool isCacheEntryPresent(int32_t slot) const override { 45 const auto it = mMemoryCache.find(slot); 46 return (it != mMemoryCache.end()) && it->second.valid(); 47 } 48 49 void addCacheEntry(const hidl_memory& memory, int32_t slot) override { 50 mMemoryCache[slot] = memory; 51 } 52 53 void removeCacheEntry(int32_t slot) override { mMemoryCache.erase(slot); } 54 55 std::tuple<ErrorStatus, hidl_vec<OutputShape>, Timing> execute( 56 const Request& request, const std::vector<int32_t>& slots, 57 MeasureTiming measure) override { 58 // convert slots to pools 59 hidl_vec<hidl_memory> pools(slots.size()); 60 std::transform(slots.begin(), slots.end(), pools.begin(), 61 [this](int32_t slot) { return mMemoryCache[slot]; }); 62 63 // create full request 64 Request fullRequest = request; 65 fullRequest.pools = std::move(pools); 66 67 // setup execution 68 ErrorStatus returnedStatus = ErrorStatus::GENERAL_FAILURE; 69 hidl_vec<OutputShape> returnedOutputShapes; 70 Timing returnedTiming; 71 auto cb = [&returnedStatus, &returnedOutputShapes, &returnedTiming]( 72 ErrorStatus status, const hidl_vec<OutputShape>& outputShapes, 73 const Timing& timing) { 74 returnedStatus = status; 75 returnedOutputShapes = outputShapes; 76 returnedTiming = timing; 77 }; 78 79 // execute 80 const Return<void> ret = mpPreparedModel->executeSynchronously(fullRequest, measure, cb); 81 if (!ret.isOk() || returnedStatus != ErrorStatus::NONE) { 82 LOG(ERROR) << "IPreparedModelAdapter::execute -- Error executing"; 83 return {returnedStatus, {}, kNoTiming}; 84 } 85 86 return std::make_tuple(returnedStatus, std::move(returnedOutputShapes), returnedTiming); 87 } 88 89 private: 90 IPreparedModel* const mpPreparedModel; 91 std::map<int32_t, hidl_memory> mMemoryCache; 92 }; 93 94 } // anonymous namespace 95 96 // serialize result 97 std::vector<FmqResultDatum> serialize(ErrorStatus errorStatus, 98 const std::vector<OutputShape>& outputShapes, Timing timing) { 99 // count how many elements need to be sent for a request 100 size_t count = 2 + outputShapes.size(); 101 for (const auto& outputShape : outputShapes) { 102 count += outputShape.dimensions.size(); 103 } 104 105 // create buffer to temporarily store elements 106 std::vector<FmqResultDatum> data; 107 data.reserve(count); 108 109 // package packetInfo 110 { 111 FmqResultDatum datum; 112 datum.packetInformation({/*.packetSize=*/static_cast<uint32_t>(count), 113 /*.errorStatus=*/errorStatus, 114 /*.numberOfOperands=*/static_cast<uint32_t>(outputShapes.size())}); 115 data.push_back(datum); 116 } 117 118 // package output shape data 119 for (const auto& operand : outputShapes) { 120 // package operand information 121 FmqResultDatum::OperandInformation info{}; 122 info.isSufficient = operand.isSufficient; 123 info.numberOfDimensions = static_cast<uint32_t>(operand.dimensions.size()); 124 125 FmqResultDatum datum; 126 datum.operandInformation(info); 127 data.push_back(datum); 128 129 // package operand dimensions 130 for (uint32_t dimension : operand.dimensions) { 131 FmqResultDatum datum; 132 datum.operandDimensionValue(dimension); 133 data.push_back(datum); 134 } 135 } 136 137 // package executionTiming 138 { 139 FmqResultDatum datum; 140 datum.executionTiming(timing); 141 data.push_back(datum); 142 } 143 144 // return result 145 return data; 146 } 147 148 // deserialize request 149 std::optional<std::tuple<Request, std::vector<int32_t>, MeasureTiming>> deserialize( 150 const std::vector<FmqRequestDatum>& data) { 151 using discriminator = FmqRequestDatum::hidl_discriminator; 152 153 size_t index = 0; 154 155 // validate packet information 156 if (data.size() == 0 || data[index].getDiscriminator() != discriminator::packetInformation) { 157 LOG(ERROR) << "FMQ Request packet ill-formed"; 158 return std::nullopt; 159 } 160 161 // unpackage packet information 162 const FmqRequestDatum::PacketInformation& packetInfo = data[index].packetInformation(); 163 index++; 164 const uint32_t packetSize = packetInfo.packetSize; 165 const uint32_t numberOfInputOperands = packetInfo.numberOfInputOperands; 166 const uint32_t numberOfOutputOperands = packetInfo.numberOfOutputOperands; 167 const uint32_t numberOfPools = packetInfo.numberOfPools; 168 169 // verify packet size 170 if (data.size() != packetSize) { 171 LOG(ERROR) << "FMQ Request packet ill-formed"; 172 return std::nullopt; 173 } 174 175 // unpackage input operands 176 std::vector<RequestArgument> inputs; 177 inputs.reserve(numberOfInputOperands); 178 for (size_t operand = 0; operand < numberOfInputOperands; ++operand) { 179 // validate input operand information 180 if (data[index].getDiscriminator() != discriminator::inputOperandInformation) { 181 LOG(ERROR) << "FMQ Request packet ill-formed"; 182 return std::nullopt; 183 } 184 185 // unpackage operand information 186 const FmqRequestDatum::OperandInformation& operandInfo = 187 data[index].inputOperandInformation(); 188 index++; 189 const bool hasNoValue = operandInfo.hasNoValue; 190 const DataLocation location = operandInfo.location; 191 const uint32_t numberOfDimensions = operandInfo.numberOfDimensions; 192 193 // unpackage operand dimensions 194 std::vector<uint32_t> dimensions; 195 dimensions.reserve(numberOfDimensions); 196 for (size_t i = 0; i < numberOfDimensions; ++i) { 197 // validate dimension 198 if (data[index].getDiscriminator() != discriminator::inputOperandDimensionValue) { 199 LOG(ERROR) << "FMQ Request packet ill-formed"; 200 return std::nullopt; 201 } 202 203 // unpackage dimension 204 const uint32_t dimension = data[index].inputOperandDimensionValue(); 205 index++; 206 207 // store result 208 dimensions.push_back(dimension); 209 } 210 211 // store result 212 inputs.push_back( 213 {/*.hasNoValue=*/hasNoValue, /*.location=*/location, /*.dimensions=*/dimensions}); 214 } 215 216 // unpackage output operands 217 std::vector<RequestArgument> outputs; 218 outputs.reserve(numberOfOutputOperands); 219 for (size_t operand = 0; operand < numberOfOutputOperands; ++operand) { 220 // validate output operand information 221 if (data[index].getDiscriminator() != discriminator::outputOperandInformation) { 222 LOG(ERROR) << "FMQ Request packet ill-formed"; 223 return std::nullopt; 224 } 225 226 // unpackage operand information 227 const FmqRequestDatum::OperandInformation& operandInfo = 228 data[index].outputOperandInformation(); 229 index++; 230 const bool hasNoValue = operandInfo.hasNoValue; 231 const DataLocation location = operandInfo.location; 232 const uint32_t numberOfDimensions = operandInfo.numberOfDimensions; 233 234 // unpackage operand dimensions 235 std::vector<uint32_t> dimensions; 236 dimensions.reserve(numberOfDimensions); 237 for (size_t i = 0; i < numberOfDimensions; ++i) { 238 // validate dimension 239 if (data[index].getDiscriminator() != discriminator::outputOperandDimensionValue) { 240 LOG(ERROR) << "FMQ Request packet ill-formed"; 241 return std::nullopt; 242 } 243 244 // unpackage dimension 245 const uint32_t dimension = data[index].outputOperandDimensionValue(); 246 index++; 247 248 // store result 249 dimensions.push_back(dimension); 250 } 251 252 // store result 253 outputs.push_back( 254 {/*.hasNoValue=*/hasNoValue, /*.location=*/location, /*.dimensions=*/dimensions}); 255 } 256 257 // unpackage pools 258 std::vector<int32_t> slots; 259 slots.reserve(numberOfPools); 260 for (size_t pool = 0; pool < numberOfPools; ++pool) { 261 // validate input operand information 262 if (data[index].getDiscriminator() != discriminator::poolIdentifier) { 263 LOG(ERROR) << "FMQ Request packet ill-formed"; 264 return std::nullopt; 265 } 266 267 // unpackage operand information 268 const int32_t poolId = data[index].poolIdentifier(); 269 index++; 270 271 // store result 272 slots.push_back(poolId); 273 } 274 275 // validate measureTiming 276 if (data[index].getDiscriminator() != discriminator::measureTiming) { 277 LOG(ERROR) << "FMQ Request packet ill-formed"; 278 return std::nullopt; 279 } 280 281 // unpackage measureTiming 282 const MeasureTiming measure = data[index].measureTiming(); 283 index++; 284 285 // validate packet information 286 if (index != packetSize) { 287 LOG(ERROR) << "FMQ Result packet ill-formed"; 288 return std::nullopt; 289 } 290 291 // return request 292 Request request = {/*.inputs=*/inputs, /*.outputs=*/outputs, /*.pools=*/{}}; 293 return std::make_tuple(std::move(request), std::move(slots), measure); 294 } 295 296 // RequestChannelReceiver methods 297 298 std::unique_ptr<RequestChannelReceiver> RequestChannelReceiver::create( 299 const FmqRequestDescriptor& requestChannel) { 300 std::unique_ptr<FmqRequestChannel> fmqRequestChannel = 301 std::make_unique<FmqRequestChannel>(requestChannel); 302 if (!fmqRequestChannel->isValid()) { 303 LOG(ERROR) << "Unable to create RequestChannelReceiver"; 304 return nullptr; 305 } 306 const bool blocking = fmqRequestChannel->getEventFlagWord() != nullptr; 307 return std::make_unique<RequestChannelReceiver>(std::move(fmqRequestChannel), blocking); 308 } 309 310 RequestChannelReceiver::RequestChannelReceiver(std::unique_ptr<FmqRequestChannel> fmqRequestChannel, 311 bool blocking) 312 : mFmqRequestChannel(std::move(fmqRequestChannel)), mBlocking(blocking) {} 313 314 std::optional<std::tuple<Request, std::vector<int32_t>, MeasureTiming>> 315 RequestChannelReceiver::getBlocking() { 316 const auto packet = getPacketBlocking(); 317 if (!packet) { 318 return std::nullopt; 319 } 320 321 return deserialize(*packet); 322 } 323 324 void RequestChannelReceiver::invalidate() { 325 mTeardown = true; 326 327 // force unblock 328 // ExecutionBurstServer is by default waiting on a request packet. If the 329 // client process destroys its burst object, the server will still be 330 // waiting on the futex (assuming mBlocking is true). This force unblock 331 // wakes up any thread waiting on the futex. 332 if (mBlocking) { 333 // TODO: look for a different/better way to signal/notify the futex to 334 // wake up any thread waiting on it 335 FmqRequestDatum datum; 336 datum.packetInformation({/*.packetSize=*/0, /*.numberOfInputOperands=*/0, 337 /*.numberOfOutputOperands=*/0, /*.numberOfPools=*/0}); 338 mFmqRequestChannel->writeBlocking(&datum, 1); 339 } 340 } 341 342 std::optional<std::vector<FmqRequestDatum>> RequestChannelReceiver::getPacketBlocking() { 343 using discriminator = FmqRequestDatum::hidl_discriminator; 344 345 if (mTeardown) { 346 return std::nullopt; 347 } 348 349 // wait for request packet and read first element of request packet 350 FmqRequestDatum datum; 351 bool success = false; 352 if (mBlocking) { 353 success = mFmqRequestChannel->readBlocking(&datum, 1); 354 } else { 355 while ((success = !mTeardown.load(std::memory_order_relaxed)) && 356 !mFmqRequestChannel->read(&datum, 1)) { 357 } 358 } 359 360 NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION, "ExecutionBurstServer getting packet"); 361 362 // retrieve remaining elements 363 // NOTE: all of the data is already available at this point, so there's no 364 // need to do a blocking wait to wait for more data. This is known because 365 // in FMQ, all writes are published (made available) atomically. Currently, 366 // the producer always publishes the entire packet in one function call, so 367 // if the first element of the packet is available, the remaining elements 368 // are also available. 369 const size_t count = mFmqRequestChannel->availableToRead(); 370 std::vector<FmqRequestDatum> packet(count + 1); 371 std::memcpy(&packet.front(), &datum, sizeof(datum)); 372 success &= mFmqRequestChannel->read(packet.data() + 1, count); 373 374 // terminate loop 375 if (mTeardown) { 376 return std::nullopt; 377 } 378 379 // ensure packet was successfully received 380 if (!success) { 381 LOG(ERROR) << "Error receiving packet"; 382 return std::nullopt; 383 } 384 385 return std::make_optional(std::move(packet)); 386 } 387 388 // ResultChannelSender methods 389 390 std::unique_ptr<ResultChannelSender> ResultChannelSender::create( 391 const FmqResultDescriptor& resultChannel) { 392 std::unique_ptr<FmqResultChannel> fmqResultChannel = 393 std::make_unique<FmqResultChannel>(resultChannel); 394 if (!fmqResultChannel->isValid()) { 395 LOG(ERROR) << "Unable to create RequestChannelSender"; 396 return nullptr; 397 } 398 const bool blocking = fmqResultChannel->getEventFlagWord() != nullptr; 399 return std::make_unique<ResultChannelSender>(std::move(fmqResultChannel), blocking); 400 } 401 402 ResultChannelSender::ResultChannelSender(std::unique_ptr<FmqResultChannel> fmqResultChannel, 403 bool blocking) 404 : mFmqResultChannel(std::move(fmqResultChannel)), mBlocking(blocking) {} 405 406 bool ResultChannelSender::send(ErrorStatus errorStatus, 407 const std::vector<OutputShape>& outputShapes, Timing timing) { 408 const std::vector<FmqResultDatum> serialized = serialize(errorStatus, outputShapes, timing); 409 return sendPacket(serialized); 410 } 411 412 bool ResultChannelSender::sendPacket(const std::vector<FmqResultDatum>& packet) { 413 if (packet.size() > mFmqResultChannel->availableToWrite()) { 414 LOG(ERROR) 415 << "ResultChannelSender::sendPacket -- packet size exceeds size available in FMQ"; 416 const std::vector<FmqResultDatum> errorPacket = 417 serialize(ErrorStatus::GENERAL_FAILURE, {}, kNoTiming); 418 if (mBlocking) { 419 return mFmqResultChannel->writeBlocking(errorPacket.data(), errorPacket.size()); 420 } else { 421 return mFmqResultChannel->write(errorPacket.data(), errorPacket.size()); 422 } 423 } 424 425 if (mBlocking) { 426 return mFmqResultChannel->writeBlocking(packet.data(), packet.size()); 427 } else { 428 return mFmqResultChannel->write(packet.data(), packet.size()); 429 } 430 } 431 432 // ExecutionBurstServer methods 433 434 sp<ExecutionBurstServer> ExecutionBurstServer::create( 435 const sp<IBurstCallback>& callback, const MQDescriptorSync<FmqRequestDatum>& requestChannel, 436 const MQDescriptorSync<FmqResultDatum>& resultChannel, 437 std::shared_ptr<IBurstExecutorWithCache> executorWithCache) { 438 // check inputs 439 if (callback == nullptr || executorWithCache == nullptr) { 440 LOG(ERROR) << "ExecutionBurstServer::create passed a nullptr"; 441 return nullptr; 442 } 443 444 // create FMQ objects 445 std::unique_ptr<RequestChannelReceiver> requestChannelReceiver = 446 RequestChannelReceiver::create(requestChannel); 447 std::unique_ptr<ResultChannelSender> resultChannelSender = 448 ResultChannelSender::create(resultChannel); 449 450 // check FMQ objects 451 if (!requestChannelReceiver || !resultChannelSender) { 452 LOG(ERROR) << "ExecutionBurstServer::create failed to create FastMessageQueue"; 453 return nullptr; 454 } 455 456 // make and return context 457 return new ExecutionBurstServer(callback, std::move(requestChannelReceiver), 458 std::move(resultChannelSender), std::move(executorWithCache)); 459 } 460 461 sp<ExecutionBurstServer> ExecutionBurstServer::create( 462 const sp<IBurstCallback>& callback, const MQDescriptorSync<FmqRequestDatum>& requestChannel, 463 const MQDescriptorSync<FmqResultDatum>& resultChannel, IPreparedModel* preparedModel) { 464 // check relevant input 465 if (preparedModel == nullptr) { 466 LOG(ERROR) << "ExecutionBurstServer::create passed a nullptr"; 467 return nullptr; 468 } 469 470 // adapt IPreparedModel to have caching 471 const std::shared_ptr<DefaultBurstExecutorWithCache> preparedModelAdapter = 472 std::make_shared<DefaultBurstExecutorWithCache>(preparedModel); 473 474 // make and return context 475 return ExecutionBurstServer::create(callback, requestChannel, resultChannel, 476 preparedModelAdapter); 477 } 478 479 ExecutionBurstServer::ExecutionBurstServer( 480 const sp<IBurstCallback>& callback, std::unique_ptr<RequestChannelReceiver> requestChannel, 481 std::unique_ptr<ResultChannelSender> resultChannel, 482 std::shared_ptr<IBurstExecutorWithCache> executorWithCache) 483 : mCallback(callback), 484 mRequestChannelReceiver(std::move(requestChannel)), 485 mResultChannelSender(std::move(resultChannel)), 486 mExecutorWithCache(std::move(executorWithCache)) { 487 // TODO: highly document the threading behavior of this class 488 mWorker = std::thread([this] { task(); }); 489 } 490 491 ExecutionBurstServer::~ExecutionBurstServer() { 492 // set teardown flag 493 mTeardown = true; 494 mRequestChannelReceiver->invalidate(); 495 496 // wait for task thread to end 497 mWorker.join(); 498 } 499 500 Return<void> ExecutionBurstServer::freeMemory(int32_t slot) { 501 mExecutorWithCache->removeCacheEntry(slot); 502 return Void(); 503 } 504 505 void ExecutionBurstServer::ensureCacheEntriesArePresentLocked(const std::vector<int32_t>& slots) { 506 const auto slotIsKnown = [this](int32_t slot) { 507 return mExecutorWithCache->isCacheEntryPresent(slot); 508 }; 509 510 // find unique unknown slots 511 std::vector<int32_t> unknownSlots = slots; 512 auto unknownSlotsEnd = unknownSlots.end(); 513 std::sort(unknownSlots.begin(), unknownSlotsEnd); 514 unknownSlotsEnd = std::unique(unknownSlots.begin(), unknownSlotsEnd); 515 unknownSlotsEnd = std::remove_if(unknownSlots.begin(), unknownSlotsEnd, slotIsKnown); 516 unknownSlots.erase(unknownSlotsEnd, unknownSlots.end()); 517 518 // quick-exit if all slots are known 519 if (unknownSlots.empty()) { 520 return; 521 } 522 523 ErrorStatus errorStatus = ErrorStatus::GENERAL_FAILURE; 524 std::vector<hidl_memory> returnedMemories; 525 auto cb = [&errorStatus, &returnedMemories](ErrorStatus status, 526 const hidl_vec<hidl_memory>& memories) { 527 errorStatus = status; 528 returnedMemories = memories; 529 }; 530 531 const Return<void> ret = mCallback->getMemories(unknownSlots, cb); 532 533 if (!ret.isOk() || errorStatus != ErrorStatus::NONE || 534 returnedMemories.size() != unknownSlots.size()) { 535 LOG(ERROR) << "Error retrieving memories"; 536 return; 537 } 538 539 // add memories to unknown slots 540 for (size_t i = 0; i < unknownSlots.size(); ++i) { 541 mExecutorWithCache->addCacheEntry(returnedMemories[i], unknownSlots[i]); 542 } 543 } 544 545 void ExecutionBurstServer::task() { 546 // loop until the burst object is being destroyed 547 while (!mTeardown) { 548 // receive request 549 auto arguments = mRequestChannelReceiver->getBlocking(); 550 551 // if the request packet was not properly received, return a generic 552 // error and skip the execution 553 // 554 // if the burst is being torn down, skip the execution exection so the 555 // "task" function can end 556 if (!arguments) { 557 if (!mTeardown) { 558 mResultChannelSender->send(ErrorStatus::GENERAL_FAILURE, {}, kNoTiming); 559 } 560 continue; 561 } 562 563 // otherwise begin tracing execution 564 NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION, 565 "ExecutionBurstServer getting memory, executing, and returning results"); 566 567 // unpack the arguments; types are Request, std::vector<int32_t>, and 568 // MeasureTiming, respectively 569 const auto [requestWithoutPools, slotsOfPools, measure] = std::move(*arguments); 570 571 // ensure executor with cache has required memory 572 std::lock_guard<std::mutex> hold(mMutex); 573 ensureCacheEntriesArePresentLocked(slotsOfPools); 574 575 // perform computation; types are ErrorStatus, hidl_vec<OutputShape>, 576 // and Timing, respectively 577 const auto [errorStatus, outputShapes, returnedTiming] = 578 mExecutorWithCache->execute(requestWithoutPools, slotsOfPools, measure); 579 580 // return result 581 mResultChannelSender->send(errorStatus, outputShapes, returnedTiming); 582 } 583 } 584 585 } // namespace android::nn 586