Home | History | Annotate | Download | only in mtp
      1 /*
      2  * Copyright (C) 2016 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include <android-base/logging.h>
     18 #include <memory>
     19 #include <pthread.h>
     20 #include <queue>
     21 #include <thread>
     22 #include <unistd.h>
     23 
     24 #include "PosixAsyncIO.h"
     25 
     26 namespace {
     27 
     28 std::thread gWorkerThread;
     29 std::deque<struct aiocb*> gWorkQueue;
     30 bool gSuspended = true;
     31 int gAiocbRefcount = 0;
     32 std::mutex gLock;
     33 std::condition_variable gWait;
     34 
     35 void work_func(void *) {
     36     pthread_setname_np(pthread_self(), "AsyncIO work");
     37     while (true) {
     38         struct aiocb *aiocbp;
     39         {
     40             std::unique_lock<std::mutex> lk(gLock);
     41             gWait.wait(lk, []{return gWorkQueue.size() > 0 || gSuspended;});
     42             if (gSuspended)
     43                 return;
     44             aiocbp = gWorkQueue.back();
     45             gWorkQueue.pop_back();
     46         }
     47         CHECK(aiocbp->queued);
     48         int ret;
     49         if (aiocbp->read) {
     50             ret = TEMP_FAILURE_RETRY(pread(aiocbp->aio_fildes,
     51                     aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset));
     52         } else {
     53             ret = TEMP_FAILURE_RETRY(pwrite(aiocbp->aio_fildes,
     54                aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset));
     55         }
     56         {
     57             std::unique_lock<std::mutex> lk(aiocbp->lock);
     58             aiocbp->ret = ret;
     59             if (aiocbp->ret == -1) {
     60                 aiocbp->error = errno;
     61             }
     62             aiocbp->queued = false;
     63         }
     64         aiocbp->cv.notify_all();
     65     }
     66 }
     67 
     68 int aio_add(struct aiocb *aiocbp) {
     69     CHECK(!aiocbp->queued);
     70     aiocbp->queued = true;
     71     {
     72         std::unique_lock<std::mutex> lk(gLock);
     73         gWorkQueue.push_front(aiocbp);
     74     }
     75     gWait.notify_one();
     76     return 0;
     77 }
     78 
     79 } // end anonymous namespace
     80 
     81 aiocb::aiocb() {
     82     this->ret = 0;
     83     this->queued = false;
     84     {
     85         std::unique_lock<std::mutex> lk(gLock);
     86         if (gAiocbRefcount == 0) {
     87             CHECK(gWorkQueue.size() == 0);
     88             CHECK(gSuspended);
     89             gSuspended = false;
     90             gWorkerThread = std::thread(work_func, nullptr);
     91         }
     92         gAiocbRefcount++;
     93     }
     94 }
     95 
     96 aiocb::~aiocb() {
     97     CHECK(!this->queued);
     98     {
     99         std::unique_lock<std::mutex> lk(gLock);
    100         CHECK(!gSuspended);
    101         if (gAiocbRefcount == 1) {
    102             CHECK(gWorkQueue.size() == 0);
    103             gSuspended = true;
    104             lk.unlock();
    105             gWait.notify_one();
    106             gWorkerThread.join();
    107             lk.lock();
    108         }
    109         gAiocbRefcount--;
    110     }
    111 }
    112 
    113 int aio_read(struct aiocb *aiocbp) {
    114     aiocbp->read = true;
    115     return aio_add(aiocbp);
    116 }
    117 
    118 int aio_write(struct aiocb *aiocbp) {
    119     aiocbp->read = false;
    120     return aio_add(aiocbp);
    121 }
    122 
    123 int aio_error(const struct aiocb *aiocbp) {
    124     return aiocbp->error;
    125 }
    126 
    127 ssize_t aio_return(struct aiocb *aiocbp) {
    128     return aiocbp->ret;
    129 }
    130 
    131 int aio_suspend(struct aiocb *aiocbp[], int n,
    132         const struct timespec *) {
    133     for (int i = 0; i < n; i++) {
    134         {
    135             std::unique_lock<std::mutex> lk(aiocbp[i]->lock);
    136             aiocbp[i]->cv.wait(lk, [aiocbp, i]{return !aiocbp[i]->queued;});
    137         }
    138     }
    139     return 0;
    140 }
    141 
    142 void aio_prepare(struct aiocb *aiocbp, void* buf, size_t count, off_t offset) {
    143     aiocbp->aio_buf = buf;
    144     aiocbp->aio_offset = offset;
    145     aiocbp->aio_nbytes = count;
    146 }
    147