Home | History | Annotate | Download | only in 2.0
      1 /*
      2  * Copyright (C) 2018 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #define LOG_TAG "BufferPoolClient"
     18 //#define LOG_NDEBUG 0
     19 
     20 #include <thread>
     21 #include <utils/Log.h>
     22 #include "BufferPoolClient.h"
     23 #include "Connection.h"
     24 
     25 namespace android {
     26 namespace hardware {
     27 namespace media {
     28 namespace bufferpool {
     29 namespace V2_0 {
     30 namespace implementation {
     31 
     32 static constexpr int64_t kReceiveTimeoutUs = 1000000; // 100ms
     33 static constexpr int kPostMaxRetry = 3;
     34 static constexpr int kCacheTtlUs = 1000000; // TODO: tune
     35 
     36 class BufferPoolClient::Impl
     37         : public std::enable_shared_from_this<BufferPoolClient::Impl> {
     38 public:
     39     explicit Impl(const sp<Accessor> &accessor, const sp<IObserver> &observer);
     40 
     41     explicit Impl(const sp<IAccessor> &accessor, const sp<IObserver> &observer);
     42 
     43     bool isValid() {
     44         return mValid;
     45     }
     46 
     47     bool isLocal() {
     48         return mValid && mLocal;
     49     }
     50 
     51     ConnectionId getConnectionId() {
     52         return mConnectionId;
     53     }
     54 
     55     sp<IAccessor> &getAccessor() {
     56         return mAccessor;
     57     }
     58 
     59     bool isActive(int64_t *lastTransactionUs, bool clearCache);
     60 
     61     void receiveInvalidation(uint32_t msgID);
     62 
     63     ResultStatus flush();
     64 
     65     ResultStatus allocate(const std::vector<uint8_t> &params,
     66                           native_handle_t **handle,
     67                           std::shared_ptr<BufferPoolData> *buffer);
     68 
     69     ResultStatus receive(
     70             TransactionId transactionId, BufferId bufferId,
     71             int64_t timestampUs,
     72             native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer);
     73 
     74     void postBufferRelease(BufferId bufferId);
     75 
     76     bool postSend(
     77             BufferId bufferId, ConnectionId receiver,
     78             TransactionId *transactionId, int64_t *timestampUs);
     79 private:
     80 
     81     bool postReceive(
     82             BufferId bufferId, TransactionId transactionId,
     83             int64_t timestampUs);
     84 
     85     bool postReceiveResult(
     86             BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync);
     87 
     88     void trySyncFromRemote();
     89 
     90     bool syncReleased(uint32_t msgId = 0);
     91 
     92     void evictCaches(bool clearCache = false);
     93 
     94     void invalidateBuffer(BufferId id);
     95 
     96     void invalidateRange(BufferId from, BufferId to);
     97 
     98     ResultStatus allocateBufferHandle(
     99             const std::vector<uint8_t>& params, BufferId *bufferId,
    100             native_handle_t **handle);
    101 
    102     ResultStatus fetchBufferHandle(
    103             TransactionId transactionId, BufferId bufferId,
    104             native_handle_t **handle);
    105 
    106     struct BlockPoolDataDtor;
    107     struct ClientBuffer;
    108 
    109     bool mLocal;
    110     bool mValid;
    111     sp<IAccessor> mAccessor;
    112     sp<Connection> mLocalConnection;
    113     sp<IConnection> mRemoteConnection;
    114     uint32_t mSeqId;
    115     ConnectionId mConnectionId;
    116     int64_t mLastEvictCacheUs;
    117     std::unique_ptr<BufferInvalidationListener> mInvalidationListener;
    118 
    119     // CachedBuffers
    120     struct BufferCache {
    121         std::mutex mLock;
    122         bool mCreating;
    123         std::condition_variable mCreateCv;
    124         std::map<BufferId, std::unique_ptr<ClientBuffer>> mBuffers;
    125         int mActive;
    126         int64_t mLastChangeUs;
    127 
    128         BufferCache() : mCreating(false), mActive(0), mLastChangeUs(getTimestampNow()) {}
    129 
    130         void incActive_l() {
    131             ++mActive;
    132             mLastChangeUs = getTimestampNow();
    133         }
    134 
    135         void decActive_l() {
    136             --mActive;
    137             mLastChangeUs = getTimestampNow();
    138         }
    139     } mCache;
    140 
    141     // FMQ - release notifier
    142     struct ReleaseCache {
    143         std::mutex mLock;
    144         // TODO: use only one list?(using one list may dealy sending messages?)
    145         std::list<BufferId> mReleasingIds;
    146         std::list<BufferId> mReleasedIds;
    147         uint32_t mInvalidateId; // TODO: invalidation ACK to bufferpool
    148         bool mInvalidateAck;
    149         std::unique_ptr<BufferStatusChannel> mStatusChannel;
    150 
    151         ReleaseCache() : mInvalidateId(0), mInvalidateAck(true) {}
    152     } mReleasing;
    153 
    154     // This lock is held during synchronization from remote side.
    155     // In order to minimize remote calls and locking durtaion, this lock is held
    156     // by best effort approach using try_lock().
    157     std::mutex mRemoteSyncLock;
    158 };
    159 
    160 struct BufferPoolClient::Impl::BlockPoolDataDtor {
    161     BlockPoolDataDtor(const std::shared_ptr<BufferPoolClient::Impl> &impl)
    162             : mImpl(impl) {}
    163 
    164     void operator()(BufferPoolData *buffer) {
    165         BufferId id = buffer->mId;
    166         delete buffer;
    167 
    168         auto impl = mImpl.lock();
    169         if (impl && impl->isValid()) {
    170             impl->postBufferRelease(id);
    171         }
    172     }
    173     const std::weak_ptr<BufferPoolClient::Impl> mImpl;
    174 };
    175 
    176 struct BufferPoolClient::Impl::ClientBuffer {
    177 private:
    178     int64_t mExpireUs;
    179     bool mHasCache;
    180     ConnectionId mConnectionId;
    181     BufferId mId;
    182     native_handle_t *mHandle;
    183     std::weak_ptr<BufferPoolData> mCache;
    184 
    185     void updateExpire() {
    186         mExpireUs = getTimestampNow() + kCacheTtlUs;
    187     }
    188 
    189 public:
    190     ClientBuffer(
    191             ConnectionId connectionId, BufferId id, native_handle_t *handle)
    192             : mHasCache(false), mConnectionId(connectionId),
    193               mId(id), mHandle(handle) {
    194         mExpireUs = getTimestampNow() + kCacheTtlUs;
    195     }
    196 
    197     ~ClientBuffer() {
    198         if (mHandle) {
    199             native_handle_close(mHandle);
    200             native_handle_delete(mHandle);
    201         }
    202     }
    203 
    204     BufferId id() const {
    205         return mId;
    206     }
    207 
    208     bool expire() const {
    209         int64_t now = getTimestampNow();
    210         return now >= mExpireUs;
    211     }
    212 
    213     bool hasCache() const {
    214         return mHasCache;
    215     }
    216 
    217     std::shared_ptr<BufferPoolData> fetchCache(native_handle_t **pHandle) {
    218         if (mHasCache) {
    219             std::shared_ptr<BufferPoolData> cache = mCache.lock();
    220             if (cache) {
    221                 *pHandle = mHandle;
    222             }
    223             return cache;
    224         }
    225         return nullptr;
    226     }
    227 
    228     std::shared_ptr<BufferPoolData> createCache(
    229             const std::shared_ptr<BufferPoolClient::Impl> &impl,
    230             native_handle_t **pHandle) {
    231         if (!mHasCache) {
    232             // Allocates a raw ptr in order to avoid sending #postBufferRelease
    233             // from deleter, in case of native_handle_clone failure.
    234             BufferPoolData *ptr = new BufferPoolData(mConnectionId, mId);
    235             if (ptr) {
    236                 std::shared_ptr<BufferPoolData> cache(ptr, BlockPoolDataDtor(impl));
    237                 if (cache) {
    238                     mCache = cache;
    239                     mHasCache = true;
    240                     *pHandle = mHandle;
    241                     return cache;
    242                 }
    243             }
    244             if (ptr) {
    245                 delete ptr;
    246             }
    247         }
    248         return nullptr;
    249     }
    250 
    251     bool onCacheRelease() {
    252         if (mHasCache) {
    253             // TODO: verify mCache is not valid;
    254             updateExpire();
    255             mHasCache = false;
    256             return true;
    257         }
    258         return false;
    259     }
    260 };
    261 
    262 BufferPoolClient::Impl::Impl(const sp<Accessor> &accessor, const sp<IObserver> &observer)
    263     : mLocal(true), mValid(false), mAccessor(accessor), mSeqId(0),
    264       mLastEvictCacheUs(getTimestampNow()) {
    265     const StatusDescriptor *statusDesc;
    266     const InvalidationDescriptor *invDesc;
    267     ResultStatus status = accessor->connect(
    268             observer, true,
    269             &mLocalConnection, &mConnectionId, &mReleasing.mInvalidateId,
    270             &statusDesc, &invDesc);
    271     if (status == ResultStatus::OK) {
    272         mReleasing.mStatusChannel =
    273                 std::make_unique<BufferStatusChannel>(*statusDesc);
    274         mInvalidationListener =
    275                 std::make_unique<BufferInvalidationListener>(*invDesc);
    276         mValid = mReleasing.mStatusChannel &&
    277                 mReleasing.mStatusChannel->isValid() &&
    278                 mInvalidationListener &&
    279                 mInvalidationListener->isValid();
    280     }
    281 }
    282 
    283 BufferPoolClient::Impl::Impl(const sp<IAccessor> &accessor, const sp<IObserver> &observer)
    284     : mLocal(false), mValid(false), mAccessor(accessor), mSeqId(0),
    285       mLastEvictCacheUs(getTimestampNow()) {
    286     bool valid = false;
    287     sp<IConnection>& outConnection = mRemoteConnection;
    288     ConnectionId& id = mConnectionId;
    289     uint32_t& outMsgId = mReleasing.mInvalidateId;
    290     std::unique_ptr<BufferStatusChannel>& outChannel =
    291             mReleasing.mStatusChannel;
    292     std::unique_ptr<BufferInvalidationListener>& outObserver =
    293             mInvalidationListener;
    294     Return<void> transResult = accessor->connect(
    295             observer,
    296             [&valid, &outConnection, &id, &outMsgId, &outChannel, &outObserver]
    297             (ResultStatus status, sp<IConnection> connection,
    298              ConnectionId connectionId, uint32_t msgId,
    299              const StatusDescriptor& statusDesc,
    300              const InvalidationDescriptor& invDesc) {
    301                 if (status == ResultStatus::OK) {
    302                     outConnection = connection;
    303                     id = connectionId;
    304                     outMsgId = msgId;
    305                     outChannel = std::make_unique<BufferStatusChannel>(statusDesc);
    306                     outObserver = std::make_unique<BufferInvalidationListener>(invDesc);
    307                     if (outChannel && outChannel->isValid() &&
    308                         outObserver && outObserver->isValid()) {
    309                         valid = true;
    310                     }
    311                 }
    312             });
    313     mValid = transResult.isOk() && valid;
    314 }
    315 
    316 bool BufferPoolClient::Impl::isActive(int64_t *lastTransactionUs, bool clearCache) {
    317     bool active = false;
    318     {
    319         std::lock_guard<std::mutex> lock(mCache.mLock);
    320         syncReleased();
    321         evictCaches(clearCache);
    322         *lastTransactionUs = mCache.mLastChangeUs;
    323         active = mCache.mActive > 0;
    324     }
    325     if (mValid && mLocal && mLocalConnection) {
    326         mLocalConnection->cleanUp(clearCache);
    327         return true;
    328     }
    329     return active;
    330 }
    331 
    332 void BufferPoolClient::Impl::receiveInvalidation(uint32_t messageId) {
    333     std::lock_guard<std::mutex> lock(mCache.mLock);
    334     syncReleased(messageId);
    335     // TODO: evict cache required?
    336 }
    337 
    338 ResultStatus BufferPoolClient::Impl::flush() {
    339     if (!mLocal || !mLocalConnection || !mValid) {
    340         return ResultStatus::CRITICAL_ERROR;
    341     }
    342     {
    343         std::unique_lock<std::mutex> lock(mCache.mLock);
    344         syncReleased();
    345         evictCaches();
    346         return mLocalConnection->flush();
    347     }
    348 }
    349 
    350 ResultStatus BufferPoolClient::Impl::allocate(
    351         const std::vector<uint8_t> &params,
    352         native_handle_t **pHandle,
    353         std::shared_ptr<BufferPoolData> *buffer) {
    354     if (!mLocal || !mLocalConnection || !mValid) {
    355         return ResultStatus::CRITICAL_ERROR;
    356     }
    357     BufferId bufferId;
    358     native_handle_t *handle = nullptr;
    359     buffer->reset();
    360     ResultStatus status = allocateBufferHandle(params, &bufferId, &handle);
    361     if (status == ResultStatus::OK) {
    362         if (handle) {
    363             std::unique_lock<std::mutex> lock(mCache.mLock);
    364             syncReleased();
    365             evictCaches();
    366             auto cacheIt = mCache.mBuffers.find(bufferId);
    367             if (cacheIt != mCache.mBuffers.end()) {
    368                 // TODO: verify it is recycled. (not having active ref)
    369                 mCache.mBuffers.erase(cacheIt);
    370             }
    371             auto clientBuffer = std::make_unique<ClientBuffer>(
    372                     mConnectionId, bufferId, handle);
    373             if (clientBuffer) {
    374                 auto result = mCache.mBuffers.insert(std::make_pair(
    375                         bufferId, std::move(clientBuffer)));
    376                 if (result.second) {
    377                     *buffer = result.first->second->createCache(
    378                             shared_from_this(), pHandle);
    379                     if (*buffer) {
    380                         mCache.incActive_l();
    381                     }
    382                 }
    383             }
    384         }
    385         if (!*buffer) {
    386             ALOGV("client cache creation failure %d: %lld",
    387                   handle != nullptr, (long long)mConnectionId);
    388             status = ResultStatus::NO_MEMORY;
    389             postBufferRelease(bufferId);
    390         }
    391     }
    392     return status;
    393 }
    394 
    395 ResultStatus BufferPoolClient::Impl::receive(
    396         TransactionId transactionId, BufferId bufferId, int64_t timestampUs,
    397         native_handle_t **pHandle,
    398         std::shared_ptr<BufferPoolData> *buffer) {
    399     if (!mValid) {
    400         return ResultStatus::CRITICAL_ERROR;
    401     }
    402     if (timestampUs != 0) {
    403         timestampUs += kReceiveTimeoutUs;
    404     }
    405     if (!postReceive(bufferId, transactionId, timestampUs)) {
    406         return ResultStatus::CRITICAL_ERROR;
    407     }
    408     ResultStatus status = ResultStatus::CRITICAL_ERROR;
    409     buffer->reset();
    410     while(1) {
    411         std::unique_lock<std::mutex> lock(mCache.mLock);
    412         syncReleased();
    413         evictCaches();
    414         auto cacheIt = mCache.mBuffers.find(bufferId);
    415         if (cacheIt != mCache.mBuffers.end()) {
    416             if (cacheIt->second->hasCache()) {
    417                 *buffer = cacheIt->second->fetchCache(pHandle);
    418                 if (!*buffer) {
    419                     // check transfer time_out
    420                     lock.unlock();
    421                     std::this_thread::yield();
    422                     continue;
    423                 }
    424                 ALOGV("client receive from reference %lld", (long long)mConnectionId);
    425                 break;
    426             } else {
    427                 *buffer = cacheIt->second->createCache(shared_from_this(), pHandle);
    428                 if (*buffer) {
    429                     mCache.incActive_l();
    430                 }
    431                 ALOGV("client receive from cache %lld", (long long)mConnectionId);
    432                 break;
    433             }
    434         } else {
    435             if (!mCache.mCreating) {
    436                 mCache.mCreating = true;
    437                 lock.unlock();
    438                 native_handle_t* handle = nullptr;
    439                 status = fetchBufferHandle(transactionId, bufferId, &handle);
    440                 lock.lock();
    441                 if (status == ResultStatus::OK) {
    442                     if (handle) {
    443                         auto clientBuffer = std::make_unique<ClientBuffer>(
    444                                 mConnectionId, bufferId, handle);
    445                         if (clientBuffer) {
    446                             auto result = mCache.mBuffers.insert(
    447                                     std::make_pair(bufferId, std::move(
    448                                             clientBuffer)));
    449                             if (result.second) {
    450                                 *buffer = result.first->second->createCache(
    451                                         shared_from_this(), pHandle);
    452                                 if (*buffer) {
    453                                     mCache.incActive_l();
    454                                 }
    455                             }
    456                         }
    457                     }
    458                     if (!*buffer) {
    459                         status = ResultStatus::NO_MEMORY;
    460                     }
    461                 }
    462                 mCache.mCreating = false;
    463                 lock.unlock();
    464                 mCache.mCreateCv.notify_all();
    465                 break;
    466             }
    467             mCache.mCreateCv.wait(lock);
    468         }
    469     }
    470     bool needsSync = false;
    471     bool posted = postReceiveResult(bufferId, transactionId,
    472                                       *buffer ? true : false, &needsSync);
    473     ALOGV("client receive %lld - %u : %s (%d)", (long long)mConnectionId, bufferId,
    474           *buffer ? "ok" : "fail", posted);
    475     if (mValid && mLocal && mLocalConnection) {
    476         mLocalConnection->cleanUp(false);
    477     }
    478     if (needsSync && mRemoteConnection) {
    479         trySyncFromRemote();
    480     }
    481     if (*buffer) {
    482         if (!posted) {
    483             buffer->reset();
    484             return ResultStatus::CRITICAL_ERROR;
    485         }
    486         return ResultStatus::OK;
    487     }
    488     return status;
    489 }
    490 
    491 
    492 void BufferPoolClient::Impl::postBufferRelease(BufferId bufferId) {
    493     std::lock_guard<std::mutex> lock(mReleasing.mLock);
    494     mReleasing.mReleasingIds.push_back(bufferId);
    495     mReleasing.mStatusChannel->postBufferRelease(
    496             mConnectionId, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
    497 }
    498 
    499 // TODO: revise ad-hoc posting data structure
    500 bool BufferPoolClient::Impl::postSend(
    501         BufferId bufferId, ConnectionId receiver,
    502         TransactionId *transactionId, int64_t *timestampUs) {
    503     {
    504         // TODO: don't need to call syncReleased every time
    505         std::lock_guard<std::mutex> lock(mCache.mLock);
    506         syncReleased();
    507     }
    508     bool ret = false;
    509     bool needsSync = false;
    510     {
    511         std::lock_guard<std::mutex> lock(mReleasing.mLock);
    512         *timestampUs = getTimestampNow();
    513         *transactionId = (mConnectionId << 32) | mSeqId++;
    514         // TODO: retry, add timeout, target?
    515         ret =  mReleasing.mStatusChannel->postBufferStatusMessage(
    516                 *transactionId, bufferId, BufferStatus::TRANSFER_TO, mConnectionId,
    517                 receiver, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
    518         needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
    519     }
    520     if (mValid && mLocal && mLocalConnection) {
    521         mLocalConnection->cleanUp(false);
    522     }
    523     if (needsSync && mRemoteConnection) {
    524         trySyncFromRemote();
    525     }
    526     return ret;
    527 }
    528 
    529 bool BufferPoolClient::Impl::postReceive(
    530         BufferId bufferId, TransactionId transactionId, int64_t timestampUs) {
    531     for (int i = 0; i < kPostMaxRetry; ++i) {
    532         std::unique_lock<std::mutex> lock(mReleasing.mLock);
    533         int64_t now = getTimestampNow();
    534         if (timestampUs == 0 || now < timestampUs) {
    535             bool result = mReleasing.mStatusChannel->postBufferStatusMessage(
    536                     transactionId, bufferId, BufferStatus::TRANSFER_FROM,
    537                     mConnectionId, -1, mReleasing.mReleasingIds,
    538                     mReleasing.mReleasedIds);
    539             if (result) {
    540                 return true;
    541             }
    542             lock.unlock();
    543             std::this_thread::yield();
    544         } else {
    545             mReleasing.mStatusChannel->postBufferStatusMessage(
    546                     transactionId, bufferId, BufferStatus::TRANSFER_TIMEOUT,
    547                     mConnectionId, -1, mReleasing.mReleasingIds,
    548                     mReleasing.mReleasedIds);
    549             return false;
    550         }
    551     }
    552     return false;
    553 }
    554 
    555 bool BufferPoolClient::Impl::postReceiveResult(
    556         BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync) {
    557     std::lock_guard<std::mutex> lock(mReleasing.mLock);
    558     // TODO: retry, add timeout
    559     bool ret = mReleasing.mStatusChannel->postBufferStatusMessage(
    560             transactionId, bufferId,
    561             result ? BufferStatus::TRANSFER_OK : BufferStatus::TRANSFER_ERROR,
    562             mConnectionId, -1, mReleasing.mReleasingIds,
    563             mReleasing.mReleasedIds);
    564     *needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
    565     return ret;
    566 }
    567 
    568 void BufferPoolClient::Impl::trySyncFromRemote() {
    569     if (mRemoteSyncLock.try_lock()) {
    570         bool needsSync = false;
    571         {
    572             std::lock_guard<std::mutex> lock(mReleasing.mLock);
    573             needsSync = mReleasing.mStatusChannel->needsSync();
    574         }
    575         if (needsSync) {
    576             TransactionId transactionId = (mConnectionId << 32);
    577             BufferId bufferId = Connection::SYNC_BUFFERID;
    578             Return<void> transResult = mRemoteConnection->fetch(
    579                     transactionId, bufferId,
    580                     []
    581                     (ResultStatus outStatus, Buffer outBuffer) {
    582                         (void) outStatus;
    583                         (void) outBuffer;
    584                     });
    585             if (!transResult.isOk()) {
    586                 ALOGD("sync from client %lld failed: bufferpool process died.",
    587                       (long long)mConnectionId);
    588             }
    589         }
    590         mRemoteSyncLock.unlock();
    591     }
    592 }
    593 
    594 // should have mCache.mLock
    595 bool BufferPoolClient::Impl::syncReleased(uint32_t messageId) {
    596     bool cleared = false;
    597     {
    598         std::lock_guard<std::mutex> lock(mReleasing.mLock);
    599         if (mReleasing.mReleasingIds.size() > 0) {
    600             mReleasing.mStatusChannel->postBufferRelease(
    601                     mConnectionId, mReleasing.mReleasingIds,
    602                     mReleasing.mReleasedIds);
    603         }
    604         if (mReleasing.mReleasedIds.size() > 0) {
    605             for (BufferId& id: mReleasing.mReleasedIds) {
    606                 ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id);
    607                 auto found = mCache.mBuffers.find(id);
    608                 if (found != mCache.mBuffers.end()) {
    609                     if (found->second->onCacheRelease()) {
    610                         mCache.decActive_l();
    611                     } else {
    612                         // should not happen!
    613                         ALOGW("client %lld cache release status inconsitent!",
    614                             (long long)mConnectionId);
    615                     }
    616                 } else {
    617                     // should not happen!
    618                     ALOGW("client %lld cache status inconsitent!", (long long)mConnectionId);
    619                 }
    620             }
    621             mReleasing.mReleasedIds.clear();
    622             cleared = true;
    623         }
    624     }
    625     std::vector<BufferInvalidationMessage> invalidations;
    626     mInvalidationListener->getInvalidations(invalidations);
    627     uint32_t lastMsgId = 0;
    628     if (invalidations.size() > 0) {
    629         for (auto it = invalidations.begin(); it != invalidations.end(); ++it) {
    630             if (it->messageId != 0) {
    631                 lastMsgId = it->messageId;
    632             }
    633             if (it->fromBufferId == it->toBufferId) {
    634                 // TODO: handle fromBufferId = UINT32_MAX
    635                 invalidateBuffer(it->fromBufferId);
    636             } else {
    637                 invalidateRange(it->fromBufferId, it->toBufferId);
    638             }
    639         }
    640     }
    641     {
    642         std::lock_guard<std::mutex> lock(mReleasing.mLock);
    643         if (lastMsgId != 0) {
    644             if (isMessageLater(lastMsgId, mReleasing.mInvalidateId)) {
    645                 mReleasing.mInvalidateId = lastMsgId;
    646                 mReleasing.mInvalidateAck = false;
    647             }
    648         } else if (messageId != 0) {
    649             // messages are drained.
    650             if (isMessageLater(messageId, mReleasing.mInvalidateId)) {
    651                 mReleasing.mInvalidateId = messageId;
    652                 mReleasing.mInvalidateAck = true;
    653             }
    654         }
    655         if (!mReleasing.mInvalidateAck) {
    656             // post ACK
    657             mReleasing.mStatusChannel->postBufferInvalidateAck(
    658                     mConnectionId,
    659                     mReleasing.mInvalidateId, &mReleasing.mInvalidateAck);
    660             ALOGV("client %lld invalidateion ack (%d) %u",
    661                 (long long)mConnectionId,
    662                 mReleasing.mInvalidateAck, mReleasing.mInvalidateId);
    663         }
    664     }
    665     return cleared;
    666 }
    667 
    668 // should have mCache.mLock
    669 void BufferPoolClient::Impl::evictCaches(bool clearCache) {
    670     int64_t now = getTimestampNow();
    671     if (now >= mLastEvictCacheUs + kCacheTtlUs || clearCache) {
    672         size_t evicted = 0;
    673         for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
    674             if (!it->second->hasCache() && (it->second->expire() || clearCache)) {
    675                 it = mCache.mBuffers.erase(it);
    676                 ++evicted;
    677             } else {
    678                 ++it;
    679             }
    680         }
    681         ALOGV("cache count %lld : total %zu, active %d, evicted %zu",
    682               (long long)mConnectionId, mCache.mBuffers.size(), mCache.mActive, evicted);
    683         mLastEvictCacheUs = now;
    684     }
    685 }
    686 
    687 // should have mCache.mLock
    688 void BufferPoolClient::Impl::invalidateBuffer(BufferId id) {
    689     for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end(); ++it) {
    690         if (id == it->second->id()) {
    691             if (!it->second->hasCache()) {
    692                 mCache.mBuffers.erase(it);
    693                 ALOGV("cache invalidated %lld : buffer %u",
    694                       (long long)mConnectionId, id);
    695             } else {
    696                 ALOGW("Inconsitent invalidation %lld : activer buffer!! %u",
    697                       (long long)mConnectionId, (unsigned int)id);
    698             }
    699             break;
    700         }
    701     }
    702 }
    703 
    704 // should have mCache.mLock
    705 void BufferPoolClient::Impl::invalidateRange(BufferId from, BufferId to) {
    706     size_t invalidated = 0;
    707     for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
    708         if (!it->second->hasCache()) {
    709             BufferId bid = it->second->id();
    710             if (from < to) {
    711                 if (from <= bid && bid < to) {
    712                     ++invalidated;
    713                     it = mCache.mBuffers.erase(it);
    714                     continue;
    715                 }
    716             } else {
    717                 if (from <= bid || bid < to) {
    718                     ++invalidated;
    719                     it = mCache.mBuffers.erase(it);
    720                     continue;
    721                 }
    722             }
    723         }
    724         ++it;
    725     }
    726     ALOGV("cache invalidated %lld : # of invalidated %zu",
    727           (long long)mConnectionId, invalidated);
    728 }
    729 
    730 ResultStatus BufferPoolClient::Impl::allocateBufferHandle(
    731         const std::vector<uint8_t>& params, BufferId *bufferId,
    732         native_handle_t** handle) {
    733     if (mLocalConnection) {
    734         const native_handle_t* allocHandle = nullptr;
    735         ResultStatus status = mLocalConnection->allocate(
    736                 params, bufferId, &allocHandle);
    737         if (status == ResultStatus::OK) {
    738             *handle = native_handle_clone(allocHandle);
    739         }
    740         ALOGV("client allocate result %lld %d : %u clone %p",
    741               (long long)mConnectionId, status == ResultStatus::OK,
    742               *handle ? *bufferId : 0 , *handle);
    743         return status;
    744     }
    745     return ResultStatus::CRITICAL_ERROR;
    746 }
    747 
    748 ResultStatus BufferPoolClient::Impl::fetchBufferHandle(
    749         TransactionId transactionId, BufferId bufferId,
    750         native_handle_t **handle) {
    751     sp<IConnection> connection;
    752     if (mLocal) {
    753         connection = mLocalConnection;
    754     } else {
    755         connection = mRemoteConnection;
    756     }
    757     ResultStatus status;
    758     Return<void> transResult = connection->fetch(
    759             transactionId, bufferId,
    760             [&status, &handle]
    761             (ResultStatus outStatus, Buffer outBuffer) {
    762                 status = outStatus;
    763                 if (status == ResultStatus::OK) {
    764                     *handle = native_handle_clone(
    765                             outBuffer.buffer.getNativeHandle());
    766                 }
    767             });
    768     return transResult.isOk() ? status : ResultStatus::CRITICAL_ERROR;
    769 }
    770 
    771 
    772 BufferPoolClient::BufferPoolClient(const sp<Accessor> &accessor,
    773                                    const sp<IObserver> &observer) {
    774     mImpl = std::make_shared<Impl>(accessor, observer);
    775 }
    776 
    777 BufferPoolClient::BufferPoolClient(const sp<IAccessor> &accessor,
    778                                    const sp<IObserver> &observer) {
    779     mImpl = std::make_shared<Impl>(accessor, observer);
    780 }
    781 
    782 BufferPoolClient::~BufferPoolClient() {
    783     // TODO: how to handle orphaned buffers?
    784 }
    785 
    786 bool BufferPoolClient::isValid() {
    787     return mImpl && mImpl->isValid();
    788 }
    789 
    790 bool BufferPoolClient::isLocal() {
    791     return mImpl && mImpl->isLocal();
    792 }
    793 
    794 bool BufferPoolClient::isActive(int64_t *lastTransactionUs, bool clearCache) {
    795     if (!isValid()) {
    796         *lastTransactionUs = 0;
    797         return false;
    798     }
    799     return mImpl->isActive(lastTransactionUs, clearCache);
    800 }
    801 
    802 ConnectionId BufferPoolClient::getConnectionId() {
    803     if (isValid()) {
    804         return mImpl->getConnectionId();
    805     }
    806     return -1;
    807 }
    808 
    809 ResultStatus BufferPoolClient::getAccessor(sp<IAccessor> *accessor) {
    810     if (isValid()) {
    811         *accessor = mImpl->getAccessor();
    812         return ResultStatus::OK;
    813     }
    814     return ResultStatus::CRITICAL_ERROR;
    815 }
    816 
    817 void BufferPoolClient::receiveInvalidation(uint32_t msgId) {
    818     ALOGV("bufferpool2 client recv inv %u", msgId);
    819     if (isValid()) {
    820         mImpl->receiveInvalidation(msgId);
    821     }
    822 }
    823 
    824 ResultStatus BufferPoolClient::flush() {
    825     if (isValid()) {
    826         return mImpl->flush();
    827     }
    828     return ResultStatus::CRITICAL_ERROR;
    829 }
    830 
    831 ResultStatus BufferPoolClient::allocate(
    832         const std::vector<uint8_t> &params,
    833         native_handle_t **handle,
    834         std::shared_ptr<BufferPoolData> *buffer) {
    835     if (isValid()) {
    836         return mImpl->allocate(params, handle, buffer);
    837     }
    838     return ResultStatus::CRITICAL_ERROR;
    839 }
    840 
    841 ResultStatus BufferPoolClient::receive(
    842         TransactionId transactionId, BufferId bufferId, int64_t timestampUs,
    843         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
    844     if (isValid()) {
    845         return mImpl->receive(transactionId, bufferId, timestampUs, handle, buffer);
    846     }
    847     return ResultStatus::CRITICAL_ERROR;
    848 }
    849 
    850 ResultStatus BufferPoolClient::postSend(
    851         ConnectionId receiverId,
    852         const std::shared_ptr<BufferPoolData> &buffer,
    853         TransactionId *transactionId,
    854         int64_t *timestampUs) {
    855     if (isValid()) {
    856         bool result = mImpl->postSend(
    857                 buffer->mId, receiverId, transactionId, timestampUs);
    858         return result ? ResultStatus::OK : ResultStatus::CRITICAL_ERROR;
    859     }
    860     return ResultStatus::CRITICAL_ERROR;
    861 }
    862 
    863 }  // namespace implementation
    864 }  // namespace V2_0
    865 }  // namespace bufferpool
    866 }  // namespace media
    867 }  // namespace hardware
    868 }  // namespace android
    869