Home | History | Annotate | Download | only in bufferpool
      1 /*
      2  * Copyright (C) 2018 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 #define LOG_TAG "BufferPoolManager"
     17 //#define LOG_NDEBUG 0
     18 
     19 #include <bufferpool/ClientManager.h>
     20 #include <hidl/HidlTransportSupport.h>
     21 #include <sys/types.h>
     22 #include <time.h>
     23 #include <unistd.h>
     24 #include <utils/Log.h>
     25 #include "BufferPoolClient.h"
     26 
     27 namespace android {
     28 namespace hardware {
     29 namespace media {
     30 namespace bufferpool {
     31 namespace V1_0 {
     32 namespace implementation {
     33 
     34 static constexpr int64_t kRegisterTimeoutUs = 500000; // 0.5 sec
     35 static constexpr int64_t kCleanUpDurationUs = 1000000; // TODO: 1 sec tune
     36 static constexpr int64_t kClientTimeoutUs = 5000000; // TODO: 5 secs tune
     37 
     38 /**
     39  * The holder of the cookie of remote IClientManager.
     40  * The cookie is process locally unique for each IClientManager.
     41  * (The cookie is used to notify death of clients to bufferpool process.)
     42  */
     43 class ClientManagerCookieHolder {
     44 public:
     45     /**
     46      * Creates a cookie holder for remote IClientManager(s).
     47      */
     48     ClientManagerCookieHolder();
     49 
     50     /**
     51      * Gets a cookie for a remote IClientManager.
     52      *
     53      * @param manager   the specified remote IClientManager.
     54      * @param added     true when the specified remote IClientManager is added
     55      *                  newly, false otherwise.
     56      *
     57      * @return the process locally unique cookie for the specified IClientManager.
     58      */
     59     uint64_t getCookie(const sp<IClientManager> &manager, bool *added);
     60 
     61 private:
     62     uint64_t mSeqId;
     63     std::mutex mLock;
     64     std::list<std::pair<const wp<IClientManager>, uint64_t>> mManagers;
     65 };
     66 
     67 ClientManagerCookieHolder::ClientManagerCookieHolder() : mSeqId(0){}
     68 
     69 uint64_t ClientManagerCookieHolder::getCookie(
     70         const sp<IClientManager> &manager,
     71         bool *added) {
     72     std::lock_guard<std::mutex> lock(mLock);
     73     for (auto it = mManagers.begin(); it != mManagers.end();) {
     74         const sp<IClientManager> key = it->first.promote();
     75         if (key) {
     76             if (interfacesEqual(key, manager)) {
     77                 *added = false;
     78                 return it->second;
     79             }
     80             ++it;
     81         } else {
     82             it = mManagers.erase(it);
     83         }
     84     }
     85     uint64_t id = mSeqId++;
     86     *added = true;
     87     mManagers.push_back(std::make_pair(manager, id));
     88     return id;
     89 }
     90 
     91 class ClientManager::Impl {
     92 public:
     93     Impl();
     94 
     95     // BnRegisterSender
     96     ResultStatus registerSender(const sp<IAccessor> &accessor,
     97                                 ConnectionId *pConnectionId);
     98 
     99     // BpRegisterSender
    100     ResultStatus registerSender(const sp<IClientManager> &receiver,
    101                                 ConnectionId senderId,
    102                                 ConnectionId *receiverId);
    103 
    104     ResultStatus create(const std::shared_ptr<BufferPoolAllocator> &allocator,
    105                         ConnectionId *pConnectionId);
    106 
    107     ResultStatus close(ConnectionId connectionId);
    108 
    109     ResultStatus allocate(ConnectionId connectionId,
    110                           const std::vector<uint8_t> &params,
    111                           native_handle_t **handle,
    112                           std::shared_ptr<BufferPoolData> *buffer);
    113 
    114     ResultStatus receive(ConnectionId connectionId,
    115                          TransactionId transactionId,
    116                          BufferId bufferId,
    117                          int64_t timestampUs,
    118                          native_handle_t **handle,
    119                          std::shared_ptr<BufferPoolData> *buffer);
    120 
    121     ResultStatus postSend(ConnectionId receiverId,
    122                           const std::shared_ptr<BufferPoolData> &buffer,
    123                           TransactionId *transactionId,
    124                           int64_t *timestampUs);
    125 
    126     ResultStatus getAccessor(ConnectionId connectionId,
    127                              sp<IAccessor> *accessor);
    128 
    129     void cleanUp(bool clearCache = false);
    130 
    131 private:
    132     // In order to prevent deadlock between multiple locks,
    133     // always lock ClientCache.lock before locking ActiveClients.lock.
    134     struct ClientCache {
    135         // This lock is held for brief duration.
    136         // Blocking operation is not performed while holding the lock.
    137         std::mutex mMutex;
    138         std::list<std::pair<const wp<IAccessor>, const std::weak_ptr<BufferPoolClient>>>
    139                 mClients;
    140         std::condition_variable mConnectCv;
    141         bool mConnecting;
    142         int64_t mLastCleanUpUs;
    143 
    144         ClientCache() : mConnecting(false), mLastCleanUpUs(getTimestampNow()) {}
    145     } mCache;
    146 
    147     // Active clients which can be retrieved via ConnectionId
    148     struct ActiveClients {
    149         // This lock is held for brief duration.
    150         // Blocking operation is not performed holding the lock.
    151         std::mutex mMutex;
    152         std::map<ConnectionId, const std::shared_ptr<BufferPoolClient>>
    153                 mClients;
    154     } mActive;
    155 
    156     ClientManagerCookieHolder mRemoteClientCookies;
    157 };
    158 
    159 ClientManager::Impl::Impl() {}
    160 
    161 ResultStatus ClientManager::Impl::registerSender(
    162         const sp<IAccessor> &accessor, ConnectionId *pConnectionId) {
    163     cleanUp();
    164     int64_t timeoutUs = getTimestampNow() + kRegisterTimeoutUs;
    165     do {
    166         std::unique_lock<std::mutex> lock(mCache.mMutex);
    167         for (auto it = mCache.mClients.begin(); it != mCache.mClients.end(); ++it) {
    168             sp<IAccessor> sAccessor = it->first.promote();
    169             if (sAccessor && interfacesEqual(sAccessor, accessor)) {
    170                 const std::shared_ptr<BufferPoolClient> client = it->second.lock();
    171                 if (client) {
    172                     std::lock_guard<std::mutex> lock(mActive.mMutex);
    173                     *pConnectionId = client->getConnectionId();
    174                     if (mActive.mClients.find(*pConnectionId) != mActive.mClients.end()) {
    175                         ALOGV("register existing connection %lld", (long long)*pConnectionId);
    176                         return ResultStatus::ALREADY_EXISTS;
    177                     }
    178                 }
    179                 mCache.mClients.erase(it);
    180                 break;
    181             }
    182         }
    183         if (!mCache.mConnecting) {
    184             mCache.mConnecting = true;
    185             lock.unlock();
    186             ResultStatus result = ResultStatus::OK;
    187             const std::shared_ptr<BufferPoolClient> client =
    188                     std::make_shared<BufferPoolClient>(accessor);
    189             lock.lock();
    190             if (!client) {
    191                 result = ResultStatus::NO_MEMORY;
    192             } else if (!client->isValid()) {
    193                 result = ResultStatus::CRITICAL_ERROR;
    194             }
    195             if (result == ResultStatus::OK) {
    196                 // TODO: handle insert fail. (malloc fail)
    197                 const std::weak_ptr<BufferPoolClient> wclient = client;
    198                 mCache.mClients.push_back(std::make_pair(accessor, wclient));
    199                 ConnectionId conId = client->getConnectionId();
    200                 {
    201                     std::lock_guard<std::mutex> lock(mActive.mMutex);
    202                     mActive.mClients.insert(std::make_pair(conId, client));
    203                 }
    204                 *pConnectionId = conId;
    205                 ALOGV("register new connection %lld", (long long)*pConnectionId);
    206             }
    207             mCache.mConnecting = false;
    208             lock.unlock();
    209             mCache.mConnectCv.notify_all();
    210             return result;
    211         }
    212         mCache.mConnectCv.wait_for(
    213                 lock, std::chrono::microseconds(kRegisterTimeoutUs));
    214     } while (getTimestampNow() < timeoutUs);
    215     // TODO: return timeout error
    216     return ResultStatus::CRITICAL_ERROR;
    217 }
    218 
    219 ResultStatus ClientManager::Impl::registerSender(
    220         const sp<IClientManager> &receiver,
    221         ConnectionId senderId,
    222         ConnectionId *receiverId) {
    223     sp<IAccessor> accessor;
    224     bool local = false;
    225     {
    226         std::lock_guard<std::mutex> lock(mActive.mMutex);
    227         auto it = mActive.mClients.find(senderId);
    228         if (it == mActive.mClients.end()) {
    229             return ResultStatus::NOT_FOUND;
    230         }
    231         it->second->getAccessor(&accessor);
    232         local = it->second->isLocal();
    233     }
    234     ResultStatus rs = ResultStatus::CRITICAL_ERROR;
    235     if (accessor) {
    236        Return<void> transResult = receiver->registerSender(
    237                 accessor,
    238                 [&rs, receiverId](
    239                         ResultStatus status,
    240                         int64_t connectionId) {
    241                     rs = status;
    242                     *receiverId = connectionId;
    243                 });
    244         if (!transResult.isOk()) {
    245             return ResultStatus::CRITICAL_ERROR;
    246         } else if (local && rs == ResultStatus::OK) {
    247             sp<ConnectionDeathRecipient> recipient = Accessor::getConnectionDeathRecipient();
    248             if (recipient)  {
    249                 ALOGV("client death recipient registered %lld", (long long)*receiverId);
    250                 bool added;
    251                 uint64_t cookie = mRemoteClientCookies.getCookie(receiver, &added);
    252                 recipient->addCookieToConnection(cookie, *receiverId);
    253                 if (added) {
    254                     Return<bool> transResult = receiver->linkToDeath(recipient, cookie);
    255                 }
    256             }
    257         }
    258     }
    259     return rs;
    260 }
    261 
    262 ResultStatus ClientManager::Impl::create(
    263         const std::shared_ptr<BufferPoolAllocator> &allocator,
    264         ConnectionId *pConnectionId) {
    265     const sp<Accessor> accessor = new Accessor(allocator);
    266     if (!accessor || !accessor->isValid()) {
    267         return ResultStatus::CRITICAL_ERROR;
    268     }
    269     std::shared_ptr<BufferPoolClient> client =
    270             std::make_shared<BufferPoolClient>(accessor);
    271     if (!client || !client->isValid()) {
    272         return ResultStatus::CRITICAL_ERROR;
    273     }
    274     // Since a new bufferpool is created, evict memories which are used by
    275     // existing bufferpools and clients.
    276     cleanUp(true);
    277     {
    278         // TODO: handle insert fail. (malloc fail)
    279         std::lock_guard<std::mutex> lock(mCache.mMutex);
    280         const std::weak_ptr<BufferPoolClient> wclient = client;
    281         mCache.mClients.push_back(std::make_pair(accessor, wclient));
    282         ConnectionId conId = client->getConnectionId();
    283         {
    284             std::lock_guard<std::mutex> lock(mActive.mMutex);
    285             mActive.mClients.insert(std::make_pair(conId, client));
    286         }
    287         *pConnectionId = conId;
    288         ALOGV("create new connection %lld", (long long)*pConnectionId);
    289     }
    290     return ResultStatus::OK;
    291 }
    292 
    293 ResultStatus ClientManager::Impl::close(ConnectionId connectionId) {
    294     std::lock_guard<std::mutex> lock1(mCache.mMutex);
    295     std::lock_guard<std::mutex> lock2(mActive.mMutex);
    296     auto it = mActive.mClients.find(connectionId);
    297     if (it != mActive.mClients.end()) {
    298         sp<IAccessor> accessor;
    299         it->second->getAccessor(&accessor);
    300         mActive.mClients.erase(connectionId);
    301         for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
    302             // clean up dead client caches
    303             sp<IAccessor> cAccessor = cit->first.promote();
    304             if (!cAccessor || (accessor && interfacesEqual(cAccessor, accessor))) {
    305                 cit = mCache.mClients.erase(cit);
    306             } else {
    307                 cit++;
    308             }
    309         }
    310         return ResultStatus::OK;
    311     }
    312     return ResultStatus::NOT_FOUND;
    313 }
    314 
    315 ResultStatus ClientManager::Impl::allocate(
    316         ConnectionId connectionId, const std::vector<uint8_t> &params,
    317         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
    318     std::shared_ptr<BufferPoolClient> client;
    319     {
    320         std::lock_guard<std::mutex> lock(mActive.mMutex);
    321         auto it = mActive.mClients.find(connectionId);
    322         if (it == mActive.mClients.end()) {
    323             return ResultStatus::NOT_FOUND;
    324         }
    325         client = it->second;
    326     }
    327     return client->allocate(params, handle, buffer);
    328 }
    329 
    330 ResultStatus ClientManager::Impl::receive(
    331         ConnectionId connectionId, TransactionId transactionId,
    332         BufferId bufferId, int64_t timestampUs,
    333         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
    334     std::shared_ptr<BufferPoolClient> client;
    335     {
    336         std::lock_guard<std::mutex> lock(mActive.mMutex);
    337         auto it = mActive.mClients.find(connectionId);
    338         if (it == mActive.mClients.end()) {
    339             return ResultStatus::NOT_FOUND;
    340         }
    341         client = it->second;
    342     }
    343     return client->receive(transactionId, bufferId, timestampUs, handle, buffer);
    344 }
    345 
    346 ResultStatus ClientManager::Impl::postSend(
    347         ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
    348         TransactionId *transactionId, int64_t *timestampUs) {
    349     ConnectionId connectionId = buffer->mConnectionId;
    350     std::shared_ptr<BufferPoolClient> client;
    351     {
    352         std::lock_guard<std::mutex> lock(mActive.mMutex);
    353         auto it = mActive.mClients.find(connectionId);
    354         if (it == mActive.mClients.end()) {
    355             return ResultStatus::NOT_FOUND;
    356         }
    357         client = it->second;
    358     }
    359     return client->postSend(receiverId, buffer, transactionId, timestampUs);
    360 }
    361 
    362 ResultStatus ClientManager::Impl::getAccessor(
    363         ConnectionId connectionId, sp<IAccessor> *accessor) {
    364     std::shared_ptr<BufferPoolClient> client;
    365     {
    366         std::lock_guard<std::mutex> lock(mActive.mMutex);
    367         auto it = mActive.mClients.find(connectionId);
    368         if (it == mActive.mClients.end()) {
    369             return ResultStatus::NOT_FOUND;
    370         }
    371         client = it->second;
    372     }
    373     return client->getAccessor(accessor);
    374 }
    375 
    376 void ClientManager::Impl::cleanUp(bool clearCache) {
    377     int64_t now = getTimestampNow();
    378     int64_t lastTransactionUs;
    379     std::lock_guard<std::mutex> lock1(mCache.mMutex);
    380     if (clearCache || mCache.mLastCleanUpUs + kCleanUpDurationUs < now) {
    381         std::lock_guard<std::mutex> lock2(mActive.mMutex);
    382         int cleaned = 0;
    383         for (auto it = mActive.mClients.begin(); it != mActive.mClients.end();) {
    384             if (!it->second->isActive(&lastTransactionUs, clearCache)) {
    385                 if (lastTransactionUs + kClientTimeoutUs < now) {
    386                     sp<IAccessor> accessor;
    387                     it->second->getAccessor(&accessor);
    388                     it = mActive.mClients.erase(it);
    389                     ++cleaned;
    390                     continue;
    391                 }
    392             }
    393             ++it;
    394         }
    395         for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
    396             // clean up dead client caches
    397             sp<IAccessor> cAccessor = cit->first.promote();
    398             if (!cAccessor) {
    399                 cit = mCache.mClients.erase(cit);
    400             } else {
    401                 ++cit;
    402             }
    403         }
    404         ALOGV("# of cleaned connections: %d", cleaned);
    405         mCache.mLastCleanUpUs = now;
    406     }
    407 }
    408 
    409 // Methods from ::android::hardware::media::bufferpool::V1_0::IClientManager follow.
    410 Return<void> ClientManager::registerSender(const sp<::android::hardware::media::bufferpool::V1_0::IAccessor>& bufferPool, registerSender_cb _hidl_cb) {
    411     if (mImpl) {
    412         ConnectionId connectionId = -1;
    413         ResultStatus status = mImpl->registerSender(bufferPool, &connectionId);
    414         _hidl_cb(status, connectionId);
    415     } else {
    416         _hidl_cb(ResultStatus::CRITICAL_ERROR, -1);
    417     }
    418     return Void();
    419 }
    420 
    421 // Methods for local use.
    422 sp<ClientManager> ClientManager::sInstance;
    423 std::mutex ClientManager::sInstanceLock;
    424 
    425 sp<ClientManager> ClientManager::getInstance() {
    426     std::lock_guard<std::mutex> lock(sInstanceLock);
    427     if (!sInstance) {
    428         sInstance = new ClientManager();
    429     }
    430     return sInstance;
    431 }
    432 
    433 ClientManager::ClientManager() : mImpl(new Impl()) {}
    434 
    435 ClientManager::~ClientManager() {
    436 }
    437 
    438 ResultStatus ClientManager::create(
    439         const std::shared_ptr<BufferPoolAllocator> &allocator,
    440         ConnectionId *pConnectionId) {
    441     if (mImpl) {
    442         return mImpl->create(allocator, pConnectionId);
    443     }
    444     return ResultStatus::CRITICAL_ERROR;
    445 }
    446 
    447 ResultStatus ClientManager::registerSender(
    448         const sp<IClientManager> &receiver,
    449         ConnectionId senderId,
    450         ConnectionId *receiverId) {
    451     if (mImpl) {
    452         return mImpl->registerSender(receiver, senderId, receiverId);
    453     }
    454     return ResultStatus::CRITICAL_ERROR;
    455 }
    456 
    457 ResultStatus ClientManager::close(ConnectionId connectionId) {
    458     if (mImpl) {
    459         return mImpl->close(connectionId);
    460     }
    461     return ResultStatus::CRITICAL_ERROR;
    462 }
    463 
    464 ResultStatus ClientManager::allocate(
    465         ConnectionId connectionId, const std::vector<uint8_t> &params,
    466         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
    467     if (mImpl) {
    468         return mImpl->allocate(connectionId, params, handle, buffer);
    469     }
    470     return ResultStatus::CRITICAL_ERROR;
    471 }
    472 
    473 ResultStatus ClientManager::receive(
    474         ConnectionId connectionId, TransactionId transactionId,
    475         BufferId bufferId, int64_t timestampUs,
    476         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
    477     if (mImpl) {
    478         return mImpl->receive(connectionId, transactionId, bufferId,
    479                               timestampUs, handle, buffer);
    480     }
    481     return ResultStatus::CRITICAL_ERROR;
    482 }
    483 
    484 ResultStatus ClientManager::postSend(
    485         ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
    486         TransactionId *transactionId, int64_t* timestampUs) {
    487     if (mImpl && buffer) {
    488         return mImpl->postSend(receiverId, buffer, transactionId, timestampUs);
    489     }
    490     return ResultStatus::CRITICAL_ERROR;
    491 }
    492 
    493 void ClientManager::cleanUp() {
    494     if (mImpl) {
    495         mImpl->cleanUp(true);
    496     }
    497 }
    498 
    499 }  // namespace implementation
    500 }  // namespace V1_0
    501 }  // namespace bufferpool
    502 }  // namespace media
    503 }  // namespace hardware
    504 }  // namespace android
    505