1 /* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include <android-base/logging.h> 18 #include <memory> 19 #include <pthread.h> 20 #include <queue> 21 #include <thread> 22 #include <unistd.h> 23 24 #include "PosixAsyncIO.h" 25 26 namespace { 27 28 std::thread gWorkerThread; 29 std::deque<struct aiocb*> gWorkQueue; 30 bool gSuspended = true; 31 int gAiocbRefcount = 0; 32 std::mutex gLock; 33 std::condition_variable gWait; 34 35 void work_func(void *) { 36 pthread_setname_np(pthread_self(), "AsyncIO work"); 37 while (true) { 38 struct aiocb *aiocbp; 39 { 40 std::unique_lock<std::mutex> lk(gLock); 41 gWait.wait(lk, []{return gWorkQueue.size() > 0 || gSuspended;}); 42 if (gSuspended) 43 return; 44 aiocbp = gWorkQueue.back(); 45 gWorkQueue.pop_back(); 46 } 47 CHECK(aiocbp->queued); 48 int ret; 49 if (aiocbp->read) { 50 ret = TEMP_FAILURE_RETRY(pread(aiocbp->aio_fildes, 51 aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset)); 52 } else { 53 ret = TEMP_FAILURE_RETRY(pwrite(aiocbp->aio_fildes, 54 aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset)); 55 } 56 { 57 std::unique_lock<std::mutex> lk(aiocbp->lock); 58 aiocbp->ret = ret; 59 if (aiocbp->ret == -1) { 60 aiocbp->error = errno; 61 } 62 aiocbp->queued = false; 63 } 64 aiocbp->cv.notify_all(); 65 } 66 } 67 68 int aio_add(struct aiocb *aiocbp) { 69 CHECK(!aiocbp->queued); 70 aiocbp->queued = true; 71 { 72 std::unique_lock<std::mutex> lk(gLock); 73 gWorkQueue.push_front(aiocbp); 74 } 75 gWait.notify_one(); 76 return 0; 77 } 78 79 } // end anonymous namespace 80 81 aiocb::aiocb() { 82 this->ret = 0; 83 this->queued = false; 84 { 85 std::unique_lock<std::mutex> lk(gLock); 86 if (gAiocbRefcount == 0) { 87 CHECK(gWorkQueue.size() == 0); 88 CHECK(gSuspended); 89 gSuspended = false; 90 gWorkerThread = std::thread(work_func, nullptr); 91 } 92 gAiocbRefcount++; 93 } 94 } 95 96 aiocb::~aiocb() { 97 CHECK(!this->queued); 98 { 99 std::unique_lock<std::mutex> lk(gLock); 100 CHECK(!gSuspended); 101 if (gAiocbRefcount == 1) { 102 CHECK(gWorkQueue.size() == 0); 103 gSuspended = true; 104 lk.unlock(); 105 gWait.notify_one(); 106 gWorkerThread.join(); 107 lk.lock(); 108 } 109 gAiocbRefcount--; 110 } 111 } 112 113 int aio_read(struct aiocb *aiocbp) { 114 aiocbp->read = true; 115 return aio_add(aiocbp); 116 } 117 118 int aio_write(struct aiocb *aiocbp) { 119 aiocbp->read = false; 120 return aio_add(aiocbp); 121 } 122 123 int aio_error(const struct aiocb *aiocbp) { 124 return aiocbp->error; 125 } 126 127 ssize_t aio_return(struct aiocb *aiocbp) { 128 return aiocbp->ret; 129 } 130 131 int aio_suspend(struct aiocb *aiocbp[], int n, 132 const struct timespec *) { 133 for (int i = 0; i < n; i++) { 134 { 135 std::unique_lock<std::mutex> lk(aiocbp[i]->lock); 136 aiocbp[i]->cv.wait(lk, [aiocbp, i]{return !aiocbp[i]->queued;}); 137 } 138 } 139 return 0; 140 } 141 142 void aio_prepare(struct aiocb *aiocbp, void* buf, size_t count, off_t offset) { 143 aiocbp->aio_buf = buf; 144 aiocbp->aio_offset = offset; 145 aiocbp->aio_nbytes = count; 146 } 147