1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #define LOG_TAG "BufferPoolManager" 17 //#define LOG_NDEBUG 0 18 19 #include <bufferpool/ClientManager.h> 20 #include <hidl/HidlTransportSupport.h> 21 #include <sys/types.h> 22 #include <time.h> 23 #include <unistd.h> 24 #include <utils/Log.h> 25 #include "BufferPoolClient.h" 26 27 namespace android { 28 namespace hardware { 29 namespace media { 30 namespace bufferpool { 31 namespace V1_0 { 32 namespace implementation { 33 34 static constexpr int64_t kRegisterTimeoutUs = 500000; // 0.5 sec 35 static constexpr int64_t kCleanUpDurationUs = 1000000; // TODO: 1 sec tune 36 static constexpr int64_t kClientTimeoutUs = 5000000; // TODO: 5 secs tune 37 38 /** 39 * The holder of the cookie of remote IClientManager. 40 * The cookie is process locally unique for each IClientManager. 41 * (The cookie is used to notify death of clients to bufferpool process.) 42 */ 43 class ClientManagerCookieHolder { 44 public: 45 /** 46 * Creates a cookie holder for remote IClientManager(s). 47 */ 48 ClientManagerCookieHolder(); 49 50 /** 51 * Gets a cookie for a remote IClientManager. 52 * 53 * @param manager the specified remote IClientManager. 54 * @param added true when the specified remote IClientManager is added 55 * newly, false otherwise. 56 * 57 * @return the process locally unique cookie for the specified IClientManager. 58 */ 59 uint64_t getCookie(const sp<IClientManager> &manager, bool *added); 60 61 private: 62 uint64_t mSeqId; 63 std::mutex mLock; 64 std::list<std::pair<const wp<IClientManager>, uint64_t>> mManagers; 65 }; 66 67 ClientManagerCookieHolder::ClientManagerCookieHolder() : mSeqId(0){} 68 69 uint64_t ClientManagerCookieHolder::getCookie( 70 const sp<IClientManager> &manager, 71 bool *added) { 72 std::lock_guard<std::mutex> lock(mLock); 73 for (auto it = mManagers.begin(); it != mManagers.end();) { 74 const sp<IClientManager> key = it->first.promote(); 75 if (key) { 76 if (interfacesEqual(key, manager)) { 77 *added = false; 78 return it->second; 79 } 80 ++it; 81 } else { 82 it = mManagers.erase(it); 83 } 84 } 85 uint64_t id = mSeqId++; 86 *added = true; 87 mManagers.push_back(std::make_pair(manager, id)); 88 return id; 89 } 90 91 class ClientManager::Impl { 92 public: 93 Impl(); 94 95 // BnRegisterSender 96 ResultStatus registerSender(const sp<IAccessor> &accessor, 97 ConnectionId *pConnectionId); 98 99 // BpRegisterSender 100 ResultStatus registerSender(const sp<IClientManager> &receiver, 101 ConnectionId senderId, 102 ConnectionId *receiverId); 103 104 ResultStatus create(const std::shared_ptr<BufferPoolAllocator> &allocator, 105 ConnectionId *pConnectionId); 106 107 ResultStatus close(ConnectionId connectionId); 108 109 ResultStatus allocate(ConnectionId connectionId, 110 const std::vector<uint8_t> ¶ms, 111 native_handle_t **handle, 112 std::shared_ptr<BufferPoolData> *buffer); 113 114 ResultStatus receive(ConnectionId connectionId, 115 TransactionId transactionId, 116 BufferId bufferId, 117 int64_t timestampUs, 118 native_handle_t **handle, 119 std::shared_ptr<BufferPoolData> *buffer); 120 121 ResultStatus postSend(ConnectionId receiverId, 122 const std::shared_ptr<BufferPoolData> &buffer, 123 TransactionId *transactionId, 124 int64_t *timestampUs); 125 126 ResultStatus getAccessor(ConnectionId connectionId, 127 sp<IAccessor> *accessor); 128 129 void cleanUp(bool clearCache = false); 130 131 private: 132 // In order to prevent deadlock between multiple locks, 133 // always lock ClientCache.lock before locking ActiveClients.lock. 134 struct ClientCache { 135 // This lock is held for brief duration. 136 // Blocking operation is not performed while holding the lock. 137 std::mutex mMutex; 138 std::list<std::pair<const wp<IAccessor>, const std::weak_ptr<BufferPoolClient>>> 139 mClients; 140 std::condition_variable mConnectCv; 141 bool mConnecting; 142 int64_t mLastCleanUpUs; 143 144 ClientCache() : mConnecting(false), mLastCleanUpUs(getTimestampNow()) {} 145 } mCache; 146 147 // Active clients which can be retrieved via ConnectionId 148 struct ActiveClients { 149 // This lock is held for brief duration. 150 // Blocking operation is not performed holding the lock. 151 std::mutex mMutex; 152 std::map<ConnectionId, const std::shared_ptr<BufferPoolClient>> 153 mClients; 154 } mActive; 155 156 ClientManagerCookieHolder mRemoteClientCookies; 157 }; 158 159 ClientManager::Impl::Impl() {} 160 161 ResultStatus ClientManager::Impl::registerSender( 162 const sp<IAccessor> &accessor, ConnectionId *pConnectionId) { 163 cleanUp(); 164 int64_t timeoutUs = getTimestampNow() + kRegisterTimeoutUs; 165 do { 166 std::unique_lock<std::mutex> lock(mCache.mMutex); 167 for (auto it = mCache.mClients.begin(); it != mCache.mClients.end(); ++it) { 168 sp<IAccessor> sAccessor = it->first.promote(); 169 if (sAccessor && interfacesEqual(sAccessor, accessor)) { 170 const std::shared_ptr<BufferPoolClient> client = it->second.lock(); 171 if (client) { 172 std::lock_guard<std::mutex> lock(mActive.mMutex); 173 *pConnectionId = client->getConnectionId(); 174 if (mActive.mClients.find(*pConnectionId) != mActive.mClients.end()) { 175 ALOGV("register existing connection %lld", (long long)*pConnectionId); 176 return ResultStatus::ALREADY_EXISTS; 177 } 178 } 179 mCache.mClients.erase(it); 180 break; 181 } 182 } 183 if (!mCache.mConnecting) { 184 mCache.mConnecting = true; 185 lock.unlock(); 186 ResultStatus result = ResultStatus::OK; 187 const std::shared_ptr<BufferPoolClient> client = 188 std::make_shared<BufferPoolClient>(accessor); 189 lock.lock(); 190 if (!client) { 191 result = ResultStatus::NO_MEMORY; 192 } else if (!client->isValid()) { 193 result = ResultStatus::CRITICAL_ERROR; 194 } 195 if (result == ResultStatus::OK) { 196 // TODO: handle insert fail. (malloc fail) 197 const std::weak_ptr<BufferPoolClient> wclient = client; 198 mCache.mClients.push_back(std::make_pair(accessor, wclient)); 199 ConnectionId conId = client->getConnectionId(); 200 { 201 std::lock_guard<std::mutex> lock(mActive.mMutex); 202 mActive.mClients.insert(std::make_pair(conId, client)); 203 } 204 *pConnectionId = conId; 205 ALOGV("register new connection %lld", (long long)*pConnectionId); 206 } 207 mCache.mConnecting = false; 208 lock.unlock(); 209 mCache.mConnectCv.notify_all(); 210 return result; 211 } 212 mCache.mConnectCv.wait_for( 213 lock, std::chrono::microseconds(kRegisterTimeoutUs)); 214 } while (getTimestampNow() < timeoutUs); 215 // TODO: return timeout error 216 return ResultStatus::CRITICAL_ERROR; 217 } 218 219 ResultStatus ClientManager::Impl::registerSender( 220 const sp<IClientManager> &receiver, 221 ConnectionId senderId, 222 ConnectionId *receiverId) { 223 sp<IAccessor> accessor; 224 bool local = false; 225 { 226 std::lock_guard<std::mutex> lock(mActive.mMutex); 227 auto it = mActive.mClients.find(senderId); 228 if (it == mActive.mClients.end()) { 229 return ResultStatus::NOT_FOUND; 230 } 231 it->second->getAccessor(&accessor); 232 local = it->second->isLocal(); 233 } 234 ResultStatus rs = ResultStatus::CRITICAL_ERROR; 235 if (accessor) { 236 Return<void> transResult = receiver->registerSender( 237 accessor, 238 [&rs, receiverId]( 239 ResultStatus status, 240 int64_t connectionId) { 241 rs = status; 242 *receiverId = connectionId; 243 }); 244 if (!transResult.isOk()) { 245 return ResultStatus::CRITICAL_ERROR; 246 } else if (local && rs == ResultStatus::OK) { 247 sp<ConnectionDeathRecipient> recipient = Accessor::getConnectionDeathRecipient(); 248 if (recipient) { 249 ALOGV("client death recipient registered %lld", (long long)*receiverId); 250 bool added; 251 uint64_t cookie = mRemoteClientCookies.getCookie(receiver, &added); 252 recipient->addCookieToConnection(cookie, *receiverId); 253 if (added) { 254 Return<bool> transResult = receiver->linkToDeath(recipient, cookie); 255 } 256 } 257 } 258 } 259 return rs; 260 } 261 262 ResultStatus ClientManager::Impl::create( 263 const std::shared_ptr<BufferPoolAllocator> &allocator, 264 ConnectionId *pConnectionId) { 265 const sp<Accessor> accessor = new Accessor(allocator); 266 if (!accessor || !accessor->isValid()) { 267 return ResultStatus::CRITICAL_ERROR; 268 } 269 std::shared_ptr<BufferPoolClient> client = 270 std::make_shared<BufferPoolClient>(accessor); 271 if (!client || !client->isValid()) { 272 return ResultStatus::CRITICAL_ERROR; 273 } 274 // Since a new bufferpool is created, evict memories which are used by 275 // existing bufferpools and clients. 276 cleanUp(true); 277 { 278 // TODO: handle insert fail. (malloc fail) 279 std::lock_guard<std::mutex> lock(mCache.mMutex); 280 const std::weak_ptr<BufferPoolClient> wclient = client; 281 mCache.mClients.push_back(std::make_pair(accessor, wclient)); 282 ConnectionId conId = client->getConnectionId(); 283 { 284 std::lock_guard<std::mutex> lock(mActive.mMutex); 285 mActive.mClients.insert(std::make_pair(conId, client)); 286 } 287 *pConnectionId = conId; 288 ALOGV("create new connection %lld", (long long)*pConnectionId); 289 } 290 return ResultStatus::OK; 291 } 292 293 ResultStatus ClientManager::Impl::close(ConnectionId connectionId) { 294 std::lock_guard<std::mutex> lock1(mCache.mMutex); 295 std::lock_guard<std::mutex> lock2(mActive.mMutex); 296 auto it = mActive.mClients.find(connectionId); 297 if (it != mActive.mClients.end()) { 298 sp<IAccessor> accessor; 299 it->second->getAccessor(&accessor); 300 mActive.mClients.erase(connectionId); 301 for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) { 302 // clean up dead client caches 303 sp<IAccessor> cAccessor = cit->first.promote(); 304 if (!cAccessor || (accessor && interfacesEqual(cAccessor, accessor))) { 305 cit = mCache.mClients.erase(cit); 306 } else { 307 cit++; 308 } 309 } 310 return ResultStatus::OK; 311 } 312 return ResultStatus::NOT_FOUND; 313 } 314 315 ResultStatus ClientManager::Impl::allocate( 316 ConnectionId connectionId, const std::vector<uint8_t> ¶ms, 317 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) { 318 std::shared_ptr<BufferPoolClient> client; 319 { 320 std::lock_guard<std::mutex> lock(mActive.mMutex); 321 auto it = mActive.mClients.find(connectionId); 322 if (it == mActive.mClients.end()) { 323 return ResultStatus::NOT_FOUND; 324 } 325 client = it->second; 326 } 327 return client->allocate(params, handle, buffer); 328 } 329 330 ResultStatus ClientManager::Impl::receive( 331 ConnectionId connectionId, TransactionId transactionId, 332 BufferId bufferId, int64_t timestampUs, 333 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) { 334 std::shared_ptr<BufferPoolClient> client; 335 { 336 std::lock_guard<std::mutex> lock(mActive.mMutex); 337 auto it = mActive.mClients.find(connectionId); 338 if (it == mActive.mClients.end()) { 339 return ResultStatus::NOT_FOUND; 340 } 341 client = it->second; 342 } 343 return client->receive(transactionId, bufferId, timestampUs, handle, buffer); 344 } 345 346 ResultStatus ClientManager::Impl::postSend( 347 ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer, 348 TransactionId *transactionId, int64_t *timestampUs) { 349 ConnectionId connectionId = buffer->mConnectionId; 350 std::shared_ptr<BufferPoolClient> client; 351 { 352 std::lock_guard<std::mutex> lock(mActive.mMutex); 353 auto it = mActive.mClients.find(connectionId); 354 if (it == mActive.mClients.end()) { 355 return ResultStatus::NOT_FOUND; 356 } 357 client = it->second; 358 } 359 return client->postSend(receiverId, buffer, transactionId, timestampUs); 360 } 361 362 ResultStatus ClientManager::Impl::getAccessor( 363 ConnectionId connectionId, sp<IAccessor> *accessor) { 364 std::shared_ptr<BufferPoolClient> client; 365 { 366 std::lock_guard<std::mutex> lock(mActive.mMutex); 367 auto it = mActive.mClients.find(connectionId); 368 if (it == mActive.mClients.end()) { 369 return ResultStatus::NOT_FOUND; 370 } 371 client = it->second; 372 } 373 return client->getAccessor(accessor); 374 } 375 376 void ClientManager::Impl::cleanUp(bool clearCache) { 377 int64_t now = getTimestampNow(); 378 int64_t lastTransactionUs; 379 std::lock_guard<std::mutex> lock1(mCache.mMutex); 380 if (clearCache || mCache.mLastCleanUpUs + kCleanUpDurationUs < now) { 381 std::lock_guard<std::mutex> lock2(mActive.mMutex); 382 int cleaned = 0; 383 for (auto it = mActive.mClients.begin(); it != mActive.mClients.end();) { 384 if (!it->second->isActive(&lastTransactionUs, clearCache)) { 385 if (lastTransactionUs + kClientTimeoutUs < now) { 386 sp<IAccessor> accessor; 387 it->second->getAccessor(&accessor); 388 it = mActive.mClients.erase(it); 389 ++cleaned; 390 continue; 391 } 392 } 393 ++it; 394 } 395 for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) { 396 // clean up dead client caches 397 sp<IAccessor> cAccessor = cit->first.promote(); 398 if (!cAccessor) { 399 cit = mCache.mClients.erase(cit); 400 } else { 401 ++cit; 402 } 403 } 404 ALOGV("# of cleaned connections: %d", cleaned); 405 mCache.mLastCleanUpUs = now; 406 } 407 } 408 409 // Methods from ::android::hardware::media::bufferpool::V1_0::IClientManager follow. 410 Return<void> ClientManager::registerSender(const sp<::android::hardware::media::bufferpool::V1_0::IAccessor>& bufferPool, registerSender_cb _hidl_cb) { 411 if (mImpl) { 412 ConnectionId connectionId = -1; 413 ResultStatus status = mImpl->registerSender(bufferPool, &connectionId); 414 _hidl_cb(status, connectionId); 415 } else { 416 _hidl_cb(ResultStatus::CRITICAL_ERROR, -1); 417 } 418 return Void(); 419 } 420 421 // Methods for local use. 422 sp<ClientManager> ClientManager::sInstance; 423 std::mutex ClientManager::sInstanceLock; 424 425 sp<ClientManager> ClientManager::getInstance() { 426 std::lock_guard<std::mutex> lock(sInstanceLock); 427 if (!sInstance) { 428 sInstance = new ClientManager(); 429 } 430 return sInstance; 431 } 432 433 ClientManager::ClientManager() : mImpl(new Impl()) {} 434 435 ClientManager::~ClientManager() { 436 } 437 438 ResultStatus ClientManager::create( 439 const std::shared_ptr<BufferPoolAllocator> &allocator, 440 ConnectionId *pConnectionId) { 441 if (mImpl) { 442 return mImpl->create(allocator, pConnectionId); 443 } 444 return ResultStatus::CRITICAL_ERROR; 445 } 446 447 ResultStatus ClientManager::registerSender( 448 const sp<IClientManager> &receiver, 449 ConnectionId senderId, 450 ConnectionId *receiverId) { 451 if (mImpl) { 452 return mImpl->registerSender(receiver, senderId, receiverId); 453 } 454 return ResultStatus::CRITICAL_ERROR; 455 } 456 457 ResultStatus ClientManager::close(ConnectionId connectionId) { 458 if (mImpl) { 459 return mImpl->close(connectionId); 460 } 461 return ResultStatus::CRITICAL_ERROR; 462 } 463 464 ResultStatus ClientManager::allocate( 465 ConnectionId connectionId, const std::vector<uint8_t> ¶ms, 466 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) { 467 if (mImpl) { 468 return mImpl->allocate(connectionId, params, handle, buffer); 469 } 470 return ResultStatus::CRITICAL_ERROR; 471 } 472 473 ResultStatus ClientManager::receive( 474 ConnectionId connectionId, TransactionId transactionId, 475 BufferId bufferId, int64_t timestampUs, 476 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) { 477 if (mImpl) { 478 return mImpl->receive(connectionId, transactionId, bufferId, 479 timestampUs, handle, buffer); 480 } 481 return ResultStatus::CRITICAL_ERROR; 482 } 483 484 ResultStatus ClientManager::postSend( 485 ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer, 486 TransactionId *transactionId, int64_t* timestampUs) { 487 if (mImpl && buffer) { 488 return mImpl->postSend(receiverId, buffer, transactionId, timestampUs); 489 } 490 return ResultStatus::CRITICAL_ERROR; 491 } 492 493 void ClientManager::cleanUp() { 494 if (mImpl) { 495 mImpl->cleanUp(true); 496 } 497 } 498 499 } // namespace implementation 500 } // namespace V1_0 501 } // namespace bufferpool 502 } // namespace media 503 } // namespace hardware 504 } // namespace android 505