Home | History | Annotate | Download | only in libaudiohal
      1 /*
      2  * Copyright (C) 2016 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #define LOG_TAG "StreamHalHidl"
     18 //#define LOG_NDEBUG 0
     19 
     20 #include <android/hardware/audio/2.0/IStreamOutCallback.h>
     21 #include <hwbinder/IPCThreadState.h>
     22 #include <mediautils/SchedulingPolicyService.h>
     23 #include <utils/Log.h>
     24 
     25 #include "DeviceHalHidl.h"
     26 #include "EffectHalHidl.h"
     27 #include "StreamHalHidl.h"
     28 
     29 using ::android::hardware::audio::common::V2_0::AudioChannelMask;
     30 using ::android::hardware::audio::common::V2_0::AudioFormat;
     31 using ::android::hardware::audio::common::V2_0::ThreadInfo;
     32 using ::android::hardware::audio::V2_0::AudioDrain;
     33 using ::android::hardware::audio::V2_0::IStreamOutCallback;
     34 using ::android::hardware::audio::V2_0::MessageQueueFlagBits;
     35 using ::android::hardware::audio::V2_0::MmapBufferInfo;
     36 using ::android::hardware::audio::V2_0::MmapPosition;
     37 using ::android::hardware::audio::V2_0::ParameterValue;
     38 using ::android::hardware::audio::V2_0::Result;
     39 using ::android::hardware::audio::V2_0::TimeSpec;
     40 using ::android::hardware::MQDescriptorSync;
     41 using ::android::hardware::Return;
     42 using ::android::hardware::Void;
     43 using ReadCommand = ::android::hardware::audio::V2_0::IStreamIn::ReadCommand;
     44 
     45 namespace android {
     46 
     47 StreamHalHidl::StreamHalHidl(IStream *stream)
     48         : ConversionHelperHidl("Stream"),
     49           mStream(stream),
     50           mHalThreadPriority(HAL_THREAD_PRIORITY_DEFAULT) {
     51 }
     52 
     53 StreamHalHidl::~StreamHalHidl() {
     54     mStream = nullptr;
     55 }
     56 
     57 status_t StreamHalHidl::getSampleRate(uint32_t *rate) {
     58     if (!mStream) return NO_INIT;
     59     return processReturn("getSampleRate", mStream->getSampleRate(), rate);
     60 }
     61 
     62 status_t StreamHalHidl::getBufferSize(size_t *size) {
     63     if (!mStream) return NO_INIT;
     64     return processReturn("getBufferSize", mStream->getBufferSize(), size);
     65 }
     66 
     67 status_t StreamHalHidl::getChannelMask(audio_channel_mask_t *mask) {
     68     if (!mStream) return NO_INIT;
     69     return processReturn("getChannelMask", mStream->getChannelMask(), mask);
     70 }
     71 
     72 status_t StreamHalHidl::getFormat(audio_format_t *format) {
     73     if (!mStream) return NO_INIT;
     74     return processReturn("getFormat", mStream->getFormat(), format);
     75 }
     76 
     77 status_t StreamHalHidl::getAudioProperties(
     78         uint32_t *sampleRate, audio_channel_mask_t *mask, audio_format_t *format) {
     79     if (!mStream) return NO_INIT;
     80     Return<void> ret = mStream->getAudioProperties(
     81             [&](uint32_t sr, AudioChannelMask m, AudioFormat f) {
     82                 *sampleRate = sr;
     83                 *mask = static_cast<audio_channel_mask_t>(m);
     84                 *format = static_cast<audio_format_t>(f);
     85             });
     86     return processReturn("getAudioProperties", ret);
     87 }
     88 
     89 status_t StreamHalHidl::setParameters(const String8& kvPairs) {
     90     if (!mStream) return NO_INIT;
     91     hidl_vec<ParameterValue> hidlParams;
     92     status_t status = parametersFromHal(kvPairs, &hidlParams);
     93     if (status != OK) return status;
     94     return processReturn("setParameters", mStream->setParameters(hidlParams));
     95 }
     96 
     97 status_t StreamHalHidl::getParameters(const String8& keys, String8 *values) {
     98     values->clear();
     99     if (!mStream) return NO_INIT;
    100     hidl_vec<hidl_string> hidlKeys;
    101     status_t status = keysFromHal(keys, &hidlKeys);
    102     if (status != OK) return status;
    103     Result retval;
    104     Return<void> ret = mStream->getParameters(
    105             hidlKeys,
    106             [&](Result r, const hidl_vec<ParameterValue>& parameters) {
    107                 retval = r;
    108                 if (retval == Result::OK) {
    109                     parametersToHal(parameters, values);
    110                 }
    111             });
    112     return processReturn("getParameters", ret, retval);
    113 }
    114 
    115 status_t StreamHalHidl::addEffect(sp<EffectHalInterface> effect) {
    116     if (!mStream) return NO_INIT;
    117     return processReturn("addEffect", mStream->addEffect(
    118                     static_cast<EffectHalHidl*>(effect.get())->effectId()));
    119 }
    120 
    121 status_t StreamHalHidl::removeEffect(sp<EffectHalInterface> effect) {
    122     if (!mStream) return NO_INIT;
    123     return processReturn("removeEffect", mStream->removeEffect(
    124                     static_cast<EffectHalHidl*>(effect.get())->effectId()));
    125 }
    126 
    127 status_t StreamHalHidl::standby() {
    128     if (!mStream) return NO_INIT;
    129     return processReturn("standby", mStream->standby());
    130 }
    131 
    132 status_t StreamHalHidl::dump(int fd) {
    133     if (!mStream) return NO_INIT;
    134     native_handle_t* hidlHandle = native_handle_create(1, 0);
    135     hidlHandle->data[0] = fd;
    136     Return<void> ret = mStream->debugDump(hidlHandle);
    137     native_handle_delete(hidlHandle);
    138     return processReturn("dump", ret);
    139 }
    140 
    141 status_t StreamHalHidl::start() {
    142     if (!mStream) return NO_INIT;
    143     return processReturn("start", mStream->start());
    144 }
    145 
    146 status_t StreamHalHidl::stop() {
    147     if (!mStream) return NO_INIT;
    148     return processReturn("stop", mStream->stop());
    149 }
    150 
    151 status_t StreamHalHidl::createMmapBuffer(int32_t minSizeFrames,
    152                                   struct audio_mmap_buffer_info *info) {
    153     Result retval;
    154     Return<void> ret = mStream->createMmapBuffer(
    155             minSizeFrames,
    156             [&](Result r, const MmapBufferInfo& hidlInfo) {
    157                 retval = r;
    158                 if (retval == Result::OK) {
    159                     const native_handle *handle = hidlInfo.sharedMemory.handle();
    160                     if (handle->numFds > 0) {
    161                         info->shared_memory_fd = handle->data[0];
    162                         info->buffer_size_frames = hidlInfo.bufferSizeFrames;
    163                         info->burst_size_frames = hidlInfo.burstSizeFrames;
    164                         // info->shared_memory_address is not needed in HIDL context
    165                         info->shared_memory_address = NULL;
    166                     } else {
    167                         retval = Result::NOT_INITIALIZED;
    168                     }
    169                 }
    170             });
    171     return processReturn("createMmapBuffer", ret, retval);
    172 }
    173 
    174 status_t StreamHalHidl::getMmapPosition(struct audio_mmap_position *position) {
    175     Result retval;
    176     Return<void> ret = mStream->getMmapPosition(
    177             [&](Result r, const MmapPosition& hidlPosition) {
    178                 retval = r;
    179                 if (retval == Result::OK) {
    180                     position->time_nanoseconds = hidlPosition.timeNanoseconds;
    181                     position->position_frames = hidlPosition.positionFrames;
    182                 }
    183             });
    184     return processReturn("getMmapPosition", ret, retval);
    185 }
    186 
    187 status_t StreamHalHidl::setHalThreadPriority(int priority) {
    188     mHalThreadPriority = priority;
    189     return OK;
    190 }
    191 
    192 bool StreamHalHidl::requestHalThreadPriority(pid_t threadPid, pid_t threadId) {
    193     if (mHalThreadPriority == HAL_THREAD_PRIORITY_DEFAULT) {
    194         return true;
    195     }
    196     int err = requestPriority(
    197             threadPid, threadId,
    198             mHalThreadPriority, false /*isForApp*/, true /*asynchronous*/);
    199     ALOGE_IF(err, "failed to set priority %d for pid %d tid %d; error %d",
    200             mHalThreadPriority, threadPid, threadId, err);
    201     // Audio will still work, but latency will be higher and sometimes unacceptable.
    202     return err == 0;
    203 }
    204 
    205 namespace {
    206 
    207 /* Notes on callback ownership.
    208 
    209 This is how (Hw)Binder ownership model looks like. The server implementation
    210 is owned by Binder framework (via sp<>). Proxies are owned by clients.
    211 When the last proxy disappears, Binder framework releases the server impl.
    212 
    213 Thus, it is not needed to keep any references to StreamOutCallback (this is
    214 the server impl) -- it will live as long as HAL server holds a strong ref to
    215 IStreamOutCallback proxy. We clear that reference by calling 'clearCallback'
    216 from the destructor of StreamOutHalHidl.
    217 
    218 The callback only keeps a weak reference to the stream. The stream is owned
    219 by AudioFlinger.
    220 
    221 */
    222 
    223 struct StreamOutCallback : public IStreamOutCallback {
    224     StreamOutCallback(const wp<StreamOutHalHidl>& stream) : mStream(stream) {}
    225 
    226     // IStreamOutCallback implementation
    227     Return<void> onWriteReady()  override {
    228         sp<StreamOutHalHidl> stream = mStream.promote();
    229         if (stream != 0) {
    230             stream->onWriteReady();
    231         }
    232         return Void();
    233     }
    234 
    235     Return<void> onDrainReady()  override {
    236         sp<StreamOutHalHidl> stream = mStream.promote();
    237         if (stream != 0) {
    238             stream->onDrainReady();
    239         }
    240         return Void();
    241     }
    242 
    243     Return<void> onError()  override {
    244         sp<StreamOutHalHidl> stream = mStream.promote();
    245         if (stream != 0) {
    246             stream->onError();
    247         }
    248         return Void();
    249     }
    250 
    251   private:
    252     wp<StreamOutHalHidl> mStream;
    253 };
    254 
    255 }  // namespace
    256 
    257 StreamOutHalHidl::StreamOutHalHidl(const sp<IStreamOut>& stream)
    258         : StreamHalHidl(stream.get()), mStream(stream), mWriterClient(0), mEfGroup(nullptr) {
    259 }
    260 
    261 StreamOutHalHidl::~StreamOutHalHidl() {
    262     if (mStream != 0) {
    263         if (mCallback.unsafe_get()) {
    264             processReturn("clearCallback", mStream->clearCallback());
    265         }
    266         processReturn("close", mStream->close());
    267         mStream.clear();
    268     }
    269     mCallback.clear();
    270     hardware::IPCThreadState::self()->flushCommands();
    271     if (mEfGroup) {
    272         EventFlag::deleteEventFlag(&mEfGroup);
    273     }
    274 }
    275 
    276 status_t StreamOutHalHidl::getFrameSize(size_t *size) {
    277     if (mStream == 0) return NO_INIT;
    278     return processReturn("getFrameSize", mStream->getFrameSize(), size);
    279 }
    280 
    281 status_t StreamOutHalHidl::getLatency(uint32_t *latency) {
    282     if (mStream == 0) return NO_INIT;
    283     if (mWriterClient == gettid() && mCommandMQ) {
    284         return callWriterThread(
    285                 WriteCommand::GET_LATENCY, "getLatency", nullptr, 0,
    286                 [&](const WriteStatus& writeStatus) {
    287                     *latency = writeStatus.reply.latencyMs;
    288                 });
    289     } else {
    290         return processReturn("getLatency", mStream->getLatency(), latency);
    291     }
    292 }
    293 
    294 status_t StreamOutHalHidl::setVolume(float left, float right) {
    295     if (mStream == 0) return NO_INIT;
    296     return processReturn("setVolume", mStream->setVolume(left, right));
    297 }
    298 
    299 status_t StreamOutHalHidl::write(const void *buffer, size_t bytes, size_t *written) {
    300     if (mStream == 0) return NO_INIT;
    301     *written = 0;
    302 
    303     if (bytes == 0 && !mDataMQ) {
    304         // Can't determine the size for the MQ buffer. Wait for a non-empty write request.
    305         ALOGW_IF(mCallback.unsafe_get(), "First call to async write with 0 bytes");
    306         return OK;
    307     }
    308 
    309     status_t status;
    310     if (!mDataMQ && (status = prepareForWriting(bytes)) != OK) {
    311         return status;
    312     }
    313 
    314     return callWriterThread(
    315             WriteCommand::WRITE, "write", static_cast<const uint8_t*>(buffer), bytes,
    316             [&] (const WriteStatus& writeStatus) {
    317                 *written = writeStatus.reply.written;
    318                 // Diagnostics of the cause of b/35813113.
    319                 ALOGE_IF(*written > bytes,
    320                         "hal reports more bytes written than asked for: %lld > %lld",
    321                         (long long)*written, (long long)bytes);
    322             });
    323 }
    324 
    325 status_t StreamOutHalHidl::callWriterThread(
    326         WriteCommand cmd, const char* cmdName,
    327         const uint8_t* data, size_t dataSize, StreamOutHalHidl::WriterCallback callback) {
    328     if (!mCommandMQ->write(&cmd)) {
    329         ALOGE("command message queue write failed for \"%s\"", cmdName);
    330         return -EAGAIN;
    331     }
    332     if (data != nullptr) {
    333         size_t availableToWrite = mDataMQ->availableToWrite();
    334         if (dataSize > availableToWrite) {
    335             ALOGW("truncating write data from %lld to %lld due to insufficient data queue space",
    336                     (long long)dataSize, (long long)availableToWrite);
    337             dataSize = availableToWrite;
    338         }
    339         if (!mDataMQ->write(data, dataSize)) {
    340             ALOGE("data message queue write failed for \"%s\"", cmdName);
    341         }
    342     }
    343     mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
    344 
    345     // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
    346     uint32_t efState = 0;
    347 retry:
    348     status_t ret = mEfGroup->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL), &efState);
    349     if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)) {
    350         WriteStatus writeStatus;
    351         writeStatus.retval = Result::NOT_INITIALIZED;
    352         if (!mStatusMQ->read(&writeStatus)) {
    353             ALOGE("status message read failed for \"%s\"", cmdName);
    354         }
    355         if (writeStatus.retval == Result::OK) {
    356             ret = OK;
    357             callback(writeStatus);
    358         } else {
    359             ret = processReturn(cmdName, writeStatus.retval);
    360         }
    361         return ret;
    362     }
    363     if (ret == -EAGAIN || ret == -EINTR) {
    364         // Spurious wakeup. This normally retries no more than once.
    365         goto retry;
    366     }
    367     return ret;
    368 }
    369 
    370 status_t StreamOutHalHidl::prepareForWriting(size_t bufferSize) {
    371     std::unique_ptr<CommandMQ> tempCommandMQ;
    372     std::unique_ptr<DataMQ> tempDataMQ;
    373     std::unique_ptr<StatusMQ> tempStatusMQ;
    374     Result retval;
    375     pid_t halThreadPid, halThreadTid;
    376     Return<void> ret = mStream->prepareForWriting(
    377             1, bufferSize,
    378             [&](Result r,
    379                     const CommandMQ::Descriptor& commandMQ,
    380                     const DataMQ::Descriptor& dataMQ,
    381                     const StatusMQ::Descriptor& statusMQ,
    382                     const ThreadInfo& halThreadInfo) {
    383                 retval = r;
    384                 if (retval == Result::OK) {
    385                     tempCommandMQ.reset(new CommandMQ(commandMQ));
    386                     tempDataMQ.reset(new DataMQ(dataMQ));
    387                     tempStatusMQ.reset(new StatusMQ(statusMQ));
    388                     if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
    389                         EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
    390                     }
    391                     halThreadPid = halThreadInfo.pid;
    392                     halThreadTid = halThreadInfo.tid;
    393                 }
    394             });
    395     if (!ret.isOk() || retval != Result::OK) {
    396         return processReturn("prepareForWriting", ret, retval);
    397     }
    398     if (!tempCommandMQ || !tempCommandMQ->isValid() ||
    399             !tempDataMQ || !tempDataMQ->isValid() ||
    400             !tempStatusMQ || !tempStatusMQ->isValid() ||
    401             !mEfGroup) {
    402         ALOGE_IF(!tempCommandMQ, "Failed to obtain command message queue for writing");
    403         ALOGE_IF(tempCommandMQ && !tempCommandMQ->isValid(),
    404                 "Command message queue for writing is invalid");
    405         ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for writing");
    406         ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for writing is invalid");
    407         ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for writing");
    408         ALOGE_IF(tempStatusMQ && !tempStatusMQ->isValid(),
    409                 "Status message queue for writing is invalid");
    410         ALOGE_IF(!mEfGroup, "Event flag creation for writing failed");
    411         return NO_INIT;
    412     }
    413     requestHalThreadPriority(halThreadPid, halThreadTid);
    414 
    415     mCommandMQ = std::move(tempCommandMQ);
    416     mDataMQ = std::move(tempDataMQ);
    417     mStatusMQ = std::move(tempStatusMQ);
    418     mWriterClient = gettid();
    419     return OK;
    420 }
    421 
    422 status_t StreamOutHalHidl::getRenderPosition(uint32_t *dspFrames) {
    423     if (mStream == 0) return NO_INIT;
    424     Result retval;
    425     Return<void> ret = mStream->getRenderPosition(
    426             [&](Result r, uint32_t d) {
    427                 retval = r;
    428                 if (retval == Result::OK) {
    429                     *dspFrames = d;
    430                 }
    431             });
    432     return processReturn("getRenderPosition", ret, retval);
    433 }
    434 
    435 status_t StreamOutHalHidl::getNextWriteTimestamp(int64_t *timestamp) {
    436     if (mStream == 0) return NO_INIT;
    437     Result retval;
    438     Return<void> ret = mStream->getNextWriteTimestamp(
    439             [&](Result r, int64_t t) {
    440                 retval = r;
    441                 if (retval == Result::OK) {
    442                     *timestamp = t;
    443                 }
    444             });
    445     return processReturn("getRenderPosition", ret, retval);
    446 }
    447 
    448 status_t StreamOutHalHidl::setCallback(wp<StreamOutHalInterfaceCallback> callback) {
    449     if (mStream == 0) return NO_INIT;
    450     status_t status = processReturn(
    451             "setCallback", mStream->setCallback(new StreamOutCallback(this)));
    452     if (status == OK) {
    453         mCallback = callback;
    454     }
    455     return status;
    456 }
    457 
    458 status_t StreamOutHalHidl::supportsPauseAndResume(bool *supportsPause, bool *supportsResume) {
    459     if (mStream == 0) return NO_INIT;
    460     Return<void> ret = mStream->supportsPauseAndResume(
    461             [&](bool p, bool r) {
    462                 *supportsPause = p;
    463                 *supportsResume = r;
    464             });
    465     return processReturn("supportsPauseAndResume", ret);
    466 }
    467 
    468 status_t StreamOutHalHidl::pause() {
    469     if (mStream == 0) return NO_INIT;
    470     return processReturn("pause", mStream->pause());
    471 }
    472 
    473 status_t StreamOutHalHidl::resume() {
    474     if (mStream == 0) return NO_INIT;
    475     return processReturn("pause", mStream->resume());
    476 }
    477 
    478 status_t StreamOutHalHidl::supportsDrain(bool *supportsDrain) {
    479     if (mStream == 0) return NO_INIT;
    480     return processReturn("supportsDrain", mStream->supportsDrain(), supportsDrain);
    481 }
    482 
    483 status_t StreamOutHalHidl::drain(bool earlyNotify) {
    484     if (mStream == 0) return NO_INIT;
    485     return processReturn(
    486             "drain", mStream->drain(earlyNotify ? AudioDrain::EARLY_NOTIFY : AudioDrain::ALL));
    487 }
    488 
    489 status_t StreamOutHalHidl::flush() {
    490     if (mStream == 0) return NO_INIT;
    491     return processReturn("pause", mStream->flush());
    492 }
    493 
    494 status_t StreamOutHalHidl::getPresentationPosition(uint64_t *frames, struct timespec *timestamp) {
    495     if (mStream == 0) return NO_INIT;
    496     if (mWriterClient == gettid() && mCommandMQ) {
    497         return callWriterThread(
    498                 WriteCommand::GET_PRESENTATION_POSITION, "getPresentationPosition", nullptr, 0,
    499                 [&](const WriteStatus& writeStatus) {
    500                     *frames = writeStatus.reply.presentationPosition.frames;
    501                     timestamp->tv_sec = writeStatus.reply.presentationPosition.timeStamp.tvSec;
    502                     timestamp->tv_nsec = writeStatus.reply.presentationPosition.timeStamp.tvNSec;
    503                 });
    504     } else {
    505         Result retval;
    506         Return<void> ret = mStream->getPresentationPosition(
    507                 [&](Result r, uint64_t hidlFrames, const TimeSpec& hidlTimeStamp) {
    508                     retval = r;
    509                     if (retval == Result::OK) {
    510                         *frames = hidlFrames;
    511                         timestamp->tv_sec = hidlTimeStamp.tvSec;
    512                         timestamp->tv_nsec = hidlTimeStamp.tvNSec;
    513                     }
    514                 });
    515         return processReturn("getPresentationPosition", ret, retval);
    516     }
    517 }
    518 
    519 void StreamOutHalHidl::onWriteReady() {
    520     sp<StreamOutHalInterfaceCallback> callback = mCallback.promote();
    521     if (callback == 0) return;
    522     ALOGV("asyncCallback onWriteReady");
    523     callback->onWriteReady();
    524 }
    525 
    526 void StreamOutHalHidl::onDrainReady() {
    527     sp<StreamOutHalInterfaceCallback> callback = mCallback.promote();
    528     if (callback == 0) return;
    529     ALOGV("asyncCallback onDrainReady");
    530     callback->onDrainReady();
    531 }
    532 
    533 void StreamOutHalHidl::onError() {
    534     sp<StreamOutHalInterfaceCallback> callback = mCallback.promote();
    535     if (callback == 0) return;
    536     ALOGV("asyncCallback onError");
    537     callback->onError();
    538 }
    539 
    540 
    541 StreamInHalHidl::StreamInHalHidl(const sp<IStreamIn>& stream)
    542         : StreamHalHidl(stream.get()), mStream(stream), mReaderClient(0), mEfGroup(nullptr) {
    543 }
    544 
    545 StreamInHalHidl::~StreamInHalHidl() {
    546     if (mStream != 0) {
    547         processReturn("close", mStream->close());
    548         mStream.clear();
    549         hardware::IPCThreadState::self()->flushCommands();
    550     }
    551     if (mEfGroup) {
    552         EventFlag::deleteEventFlag(&mEfGroup);
    553     }
    554 }
    555 
    556 status_t StreamInHalHidl::getFrameSize(size_t *size) {
    557     if (mStream == 0) return NO_INIT;
    558     return processReturn("getFrameSize", mStream->getFrameSize(), size);
    559 }
    560 
    561 status_t StreamInHalHidl::setGain(float gain) {
    562     if (mStream == 0) return NO_INIT;
    563     return processReturn("setGain", mStream->setGain(gain));
    564 }
    565 
    566 status_t StreamInHalHidl::read(void *buffer, size_t bytes, size_t *read) {
    567     if (mStream == 0) return NO_INIT;
    568     *read = 0;
    569 
    570     if (bytes == 0 && !mDataMQ) {
    571         // Can't determine the size for the MQ buffer. Wait for a non-empty read request.
    572         return OK;
    573     }
    574 
    575     status_t status;
    576     if (!mDataMQ && (status = prepareForReading(bytes)) != OK) {
    577         return status;
    578     }
    579 
    580     ReadParameters params;
    581     params.command = ReadCommand::READ;
    582     params.params.read = bytes;
    583     return callReaderThread(params, "read",
    584             [&](const ReadStatus& readStatus) {
    585                 const size_t availToRead = mDataMQ->availableToRead();
    586                 if (!mDataMQ->read(static_cast<uint8_t*>(buffer), std::min(bytes, availToRead))) {
    587                     ALOGE("data message queue read failed for \"read\"");
    588                 }
    589                 ALOGW_IF(availToRead != readStatus.reply.read,
    590                         "HAL read report inconsistent: mq = %d, status = %d",
    591                         (int32_t)availToRead, (int32_t)readStatus.reply.read);
    592                 *read = readStatus.reply.read;
    593             });
    594 }
    595 
    596 status_t StreamInHalHidl::callReaderThread(
    597         const ReadParameters& params, const char* cmdName,
    598         StreamInHalHidl::ReaderCallback callback) {
    599     if (!mCommandMQ->write(&params)) {
    600         ALOGW("command message queue write failed");
    601         return -EAGAIN;
    602     }
    603     mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
    604 
    605     // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
    606     uint32_t efState = 0;
    607 retry:
    608     status_t ret = mEfGroup->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY), &efState);
    609     if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY)) {
    610         ReadStatus readStatus;
    611         readStatus.retval = Result::NOT_INITIALIZED;
    612         if (!mStatusMQ->read(&readStatus)) {
    613             ALOGE("status message read failed for \"%s\"", cmdName);
    614         }
    615          if (readStatus.retval == Result::OK) {
    616             ret = OK;
    617             callback(readStatus);
    618         } else {
    619             ret = processReturn(cmdName, readStatus.retval);
    620         }
    621         return ret;
    622     }
    623     if (ret == -EAGAIN || ret == -EINTR) {
    624         // Spurious wakeup. This normally retries no more than once.
    625         goto retry;
    626     }
    627     return ret;
    628 }
    629 
    630 status_t StreamInHalHidl::prepareForReading(size_t bufferSize) {
    631     std::unique_ptr<CommandMQ> tempCommandMQ;
    632     std::unique_ptr<DataMQ> tempDataMQ;
    633     std::unique_ptr<StatusMQ> tempStatusMQ;
    634     Result retval;
    635     pid_t halThreadPid, halThreadTid;
    636     Return<void> ret = mStream->prepareForReading(
    637             1, bufferSize,
    638             [&](Result r,
    639                     const CommandMQ::Descriptor& commandMQ,
    640                     const DataMQ::Descriptor& dataMQ,
    641                     const StatusMQ::Descriptor& statusMQ,
    642                     const ThreadInfo& halThreadInfo) {
    643                 retval = r;
    644                 if (retval == Result::OK) {
    645                     tempCommandMQ.reset(new CommandMQ(commandMQ));
    646                     tempDataMQ.reset(new DataMQ(dataMQ));
    647                     tempStatusMQ.reset(new StatusMQ(statusMQ));
    648                     if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
    649                         EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
    650                     }
    651                     halThreadPid = halThreadInfo.pid;
    652                     halThreadTid = halThreadInfo.tid;
    653                 }
    654             });
    655     if (!ret.isOk() || retval != Result::OK) {
    656         return processReturn("prepareForReading", ret, retval);
    657     }
    658     if (!tempCommandMQ || !tempCommandMQ->isValid() ||
    659             !tempDataMQ || !tempDataMQ->isValid() ||
    660             !tempStatusMQ || !tempStatusMQ->isValid() ||
    661             !mEfGroup) {
    662         ALOGE_IF(!tempCommandMQ, "Failed to obtain command message queue for writing");
    663         ALOGE_IF(tempCommandMQ && !tempCommandMQ->isValid(),
    664                 "Command message queue for writing is invalid");
    665         ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for reading");
    666         ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for reading is invalid");
    667         ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for reading");
    668         ALOGE_IF(tempStatusMQ && !tempStatusMQ->isValid(),
    669                 "Status message queue for reading is invalid");
    670         ALOGE_IF(!mEfGroup, "Event flag creation for reading failed");
    671         return NO_INIT;
    672     }
    673     requestHalThreadPriority(halThreadPid, halThreadTid);
    674 
    675     mCommandMQ = std::move(tempCommandMQ);
    676     mDataMQ = std::move(tempDataMQ);
    677     mStatusMQ = std::move(tempStatusMQ);
    678     mReaderClient = gettid();
    679     return OK;
    680 }
    681 
    682 status_t StreamInHalHidl::getInputFramesLost(uint32_t *framesLost) {
    683     if (mStream == 0) return NO_INIT;
    684     return processReturn("getInputFramesLost", mStream->getInputFramesLost(), framesLost);
    685 }
    686 
    687 status_t StreamInHalHidl::getCapturePosition(int64_t *frames, int64_t *time) {
    688     if (mStream == 0) return NO_INIT;
    689     if (mReaderClient == gettid() && mCommandMQ) {
    690         ReadParameters params;
    691         params.command = ReadCommand::GET_CAPTURE_POSITION;
    692         return callReaderThread(params, "getCapturePosition",
    693                 [&](const ReadStatus& readStatus) {
    694                     *frames = readStatus.reply.capturePosition.frames;
    695                     *time = readStatus.reply.capturePosition.time;
    696                 });
    697     } else {
    698         Result retval;
    699         Return<void> ret = mStream->getCapturePosition(
    700                 [&](Result r, uint64_t hidlFrames, uint64_t hidlTime) {
    701                     retval = r;
    702                     if (retval == Result::OK) {
    703                         *frames = hidlFrames;
    704                         *time = hidlTime;
    705                     }
    706                 });
    707         return processReturn("getCapturePosition", ret, retval);
    708     }
    709 }
    710 
    711 } // namespace android
    712