1 /****************************************************************************** 2 * 3 * Copyright (C) 2014 Google, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at: 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 ******************************************************************************/ 18 19 #define LOG_TAG "osi_thread" 20 21 #include <assert.h> 22 #include <errno.h> 23 #include <pthread.h> 24 #include <string.h> 25 #include <sys/prctl.h> 26 #include <sys/types.h> 27 #include <utils/Log.h> 28 29 #include "fixed_queue.h" 30 #include "reactor.h" 31 #include "semaphore.h" 32 #include "thread.h" 33 34 struct thread_t { 35 pthread_t pthread; 36 pid_t tid; 37 char name[THREAD_NAME_MAX + 1]; 38 reactor_t *reactor; 39 fixed_queue_t *work_queue; 40 }; 41 42 struct start_arg { 43 thread_t *thread; 44 semaphore_t *start_sem; 45 int error; 46 }; 47 48 typedef struct { 49 thread_fn func; 50 void *context; 51 } work_item_t; 52 53 static void *run_thread(void *start_arg); 54 static void work_queue_read_cb(void *context); 55 56 static const size_t WORK_QUEUE_CAPACITY = 128; 57 58 thread_t *thread_new(const char *name) { 59 assert(name != NULL); 60 61 // Start is on the stack, but we use a semaphore, so it's safe 62 thread_t *ret = calloc(1, sizeof(thread_t)); 63 if (!ret) 64 goto error; 65 66 ret->reactor = reactor_new(); 67 if (!ret->reactor) 68 goto error; 69 70 ret->work_queue = fixed_queue_new(WORK_QUEUE_CAPACITY); 71 if (!ret->work_queue) 72 goto error; 73 74 struct start_arg start; 75 start.start_sem = semaphore_new(0); 76 if (!start.start_sem) 77 goto error; 78 79 strncpy(ret->name, name, THREAD_NAME_MAX); 80 start.thread = ret; 81 start.error = 0; 82 pthread_create(&ret->pthread, NULL, run_thread, &start); 83 semaphore_wait(start.start_sem); 84 semaphore_free(start.start_sem); 85 if (start.error) 86 goto error; 87 return ret; 88 89 error:; 90 if (ret) { 91 fixed_queue_free(ret->work_queue, free); 92 reactor_free(ret->reactor); 93 } 94 free(ret); 95 return NULL; 96 } 97 98 void thread_free(thread_t *thread) { 99 if (!thread) 100 return; 101 102 thread_stop(thread); 103 pthread_join(thread->pthread, NULL); 104 fixed_queue_free(thread->work_queue, free); 105 reactor_free(thread->reactor); 106 free(thread); 107 } 108 109 bool thread_post(thread_t *thread, thread_fn func, void *context) { 110 assert(thread != NULL); 111 assert(func != NULL); 112 113 // TODO(sharvil): if the current thread == |thread| and we've run out 114 // of queue space, we should abort this operation, otherwise we'll 115 // deadlock. 116 117 // Queue item is freed either when the queue itself is destroyed 118 // or when the item is removed from the queue for dispatch. 119 work_item_t *item = (work_item_t *)malloc(sizeof(work_item_t)); 120 if (!item) { 121 ALOGE("%s unable to allocate memory: %s", __func__, strerror(errno)); 122 return false; 123 } 124 item->func = func; 125 item->context = context; 126 fixed_queue_enqueue(thread->work_queue, item); 127 return true; 128 } 129 130 void thread_stop(thread_t *thread) { 131 assert(thread != NULL); 132 reactor_stop(thread->reactor); 133 } 134 135 const char *thread_name(const thread_t *thread) { 136 assert(thread != NULL); 137 return thread->name; 138 } 139 140 static void *run_thread(void *start_arg) { 141 assert(start_arg != NULL); 142 143 struct start_arg *start = start_arg; 144 thread_t *thread = start->thread; 145 146 assert(thread != NULL); 147 148 if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) { 149 ALOGE("%s unable to set thread name: %s", __func__, strerror(errno)); 150 start->error = errno; 151 semaphore_post(start->start_sem); 152 return NULL; 153 } 154 thread->tid = gettid(); 155 156 semaphore_post(start->start_sem); 157 158 reactor_object_t work_queue_object; 159 work_queue_object.context = thread->work_queue; 160 work_queue_object.fd = fixed_queue_get_dequeue_fd(thread->work_queue); 161 work_queue_object.interest = REACTOR_INTEREST_READ; 162 work_queue_object.read_ready = work_queue_read_cb; 163 164 reactor_register(thread->reactor, &work_queue_object); 165 reactor_start(thread->reactor); 166 167 // Make sure we dispatch all queued work items before exiting the thread. 168 // This allows a caller to safely tear down by enqueuing a teardown 169 // work item and then joining the thread. 170 size_t count = 0; 171 work_item_t *item = fixed_queue_try_dequeue(thread->work_queue); 172 while (item && count <= WORK_QUEUE_CAPACITY) { 173 item->func(item->context); 174 free(item); 175 item = fixed_queue_try_dequeue(thread->work_queue); 176 ++count; 177 } 178 179 if (count > WORK_QUEUE_CAPACITY) 180 ALOGD("%s growing event queue on shutdown.", __func__); 181 182 return NULL; 183 } 184 185 static void work_queue_read_cb(void *context) { 186 assert(context != NULL); 187 188 fixed_queue_t *queue = (fixed_queue_t *)context; 189 work_item_t *item = fixed_queue_dequeue(queue); 190 item->func(item->context); 191 free(item); 192 } 193