1 /****************************************************************************** 2 * 3 * Copyright (C) 2014 Google, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at: 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 ******************************************************************************/ 18 19 #define LOG_TAG "bt_osi_thread" 20 21 #include "osi/include/thread.h" 22 23 #include <assert.h> 24 #include <errno.h> 25 #include <malloc.h> 26 #include <pthread.h> 27 #include <string.h> 28 #include <sys/prctl.h> 29 #include <sys/resource.h> 30 #include <sys/types.h> 31 32 #include "osi/include/allocator.h" 33 #include "osi/include/compat.h" 34 #include "osi/include/fixed_queue.h" 35 #include "osi/include/log.h" 36 #include "osi/include/reactor.h" 37 #include "osi/include/semaphore.h" 38 39 struct thread_t { 40 bool is_joined; 41 pthread_t pthread; 42 pid_t tid; 43 char name[THREAD_NAME_MAX + 1]; 44 reactor_t *reactor; 45 fixed_queue_t *work_queue; 46 }; 47 48 struct start_arg { 49 thread_t *thread; 50 semaphore_t *start_sem; 51 int error; 52 }; 53 54 typedef struct { 55 thread_fn func; 56 void *context; 57 } work_item_t; 58 59 static void *run_thread(void *start_arg); 60 static void work_queue_read_cb(void *context); 61 62 static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 128; 63 64 thread_t *thread_new_sized(const char *name, size_t work_queue_capacity) { 65 assert(name != NULL); 66 assert(work_queue_capacity != 0); 67 68 thread_t *ret = osi_calloc(sizeof(thread_t)); 69 70 ret->reactor = reactor_new(); 71 if (!ret->reactor) 72 goto error; 73 74 ret->work_queue = fixed_queue_new(work_queue_capacity); 75 if (!ret->work_queue) 76 goto error; 77 78 // Start is on the stack, but we use a semaphore, so it's safe 79 struct start_arg start; 80 start.start_sem = semaphore_new(0); 81 if (!start.start_sem) 82 goto error; 83 84 strncpy(ret->name, name, THREAD_NAME_MAX); 85 start.thread = ret; 86 start.error = 0; 87 pthread_create(&ret->pthread, NULL, run_thread, &start); 88 semaphore_wait(start.start_sem); 89 semaphore_free(start.start_sem); 90 91 if (start.error) 92 goto error; 93 94 return ret; 95 96 error:; 97 if (ret) { 98 fixed_queue_free(ret->work_queue, osi_free); 99 reactor_free(ret->reactor); 100 } 101 osi_free(ret); 102 return NULL; 103 } 104 105 thread_t *thread_new(const char *name) { 106 return thread_new_sized(name, DEFAULT_WORK_QUEUE_CAPACITY); 107 } 108 109 void thread_free(thread_t *thread) { 110 if (!thread) 111 return; 112 113 thread_stop(thread); 114 thread_join(thread); 115 116 fixed_queue_free(thread->work_queue, osi_free); 117 reactor_free(thread->reactor); 118 osi_free(thread); 119 } 120 121 void thread_join(thread_t *thread) { 122 assert(thread != NULL); 123 124 // TODO(zachoverflow): use a compare and swap when ready 125 if (!thread->is_joined) { 126 thread->is_joined = true; 127 pthread_join(thread->pthread, NULL); 128 } 129 } 130 131 bool thread_post(thread_t *thread, thread_fn func, void *context) { 132 assert(thread != NULL); 133 assert(func != NULL); 134 135 // TODO(sharvil): if the current thread == |thread| and we've run out 136 // of queue space, we should abort this operation, otherwise we'll 137 // deadlock. 138 139 // Queue item is freed either when the queue itself is destroyed 140 // or when the item is removed from the queue for dispatch. 141 work_item_t *item = (work_item_t *)osi_malloc(sizeof(work_item_t)); 142 item->func = func; 143 item->context = context; 144 fixed_queue_enqueue(thread->work_queue, item); 145 return true; 146 } 147 148 void thread_stop(thread_t *thread) { 149 assert(thread != NULL); 150 reactor_stop(thread->reactor); 151 } 152 153 bool thread_set_priority(thread_t *thread, int priority) { 154 if (!thread) 155 return false; 156 157 const int rc = setpriority(PRIO_PROCESS, thread->tid, priority); 158 if (rc < 0) { 159 LOG_ERROR(LOG_TAG, "%s unable to set thread priority %d for tid %d, error %d", 160 __func__, priority, thread->tid, rc); 161 return false; 162 } 163 164 return true; 165 } 166 167 bool thread_is_self(const thread_t *thread) { 168 assert(thread != NULL); 169 return !!pthread_equal(pthread_self(), thread->pthread); 170 } 171 172 reactor_t *thread_get_reactor(const thread_t *thread) { 173 assert(thread != NULL); 174 return thread->reactor; 175 } 176 177 const char *thread_name(const thread_t *thread) { 178 assert(thread != NULL); 179 return thread->name; 180 } 181 182 static void *run_thread(void *start_arg) { 183 assert(start_arg != NULL); 184 185 struct start_arg *start = start_arg; 186 thread_t *thread = start->thread; 187 188 assert(thread != NULL); 189 190 if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) { 191 LOG_ERROR(LOG_TAG, "%s unable to set thread name: %s", __func__, strerror(errno)); 192 start->error = errno; 193 semaphore_post(start->start_sem); 194 return NULL; 195 } 196 thread->tid = gettid(); 197 198 LOG_WARN(LOG_TAG, "%s: thread id %d, thread name %s started", __func__, thread->tid, thread->name); 199 200 semaphore_post(start->start_sem); 201 202 int fd = fixed_queue_get_dequeue_fd(thread->work_queue); 203 void *context = thread->work_queue; 204 205 reactor_object_t *work_queue_object = reactor_register(thread->reactor, fd, context, work_queue_read_cb, NULL); 206 reactor_start(thread->reactor); 207 reactor_unregister(work_queue_object); 208 209 // Make sure we dispatch all queued work items before exiting the thread. 210 // This allows a caller to safely tear down by enqueuing a teardown 211 // work item and then joining the thread. 212 size_t count = 0; 213 work_item_t *item = fixed_queue_try_dequeue(thread->work_queue); 214 while (item && count <= fixed_queue_capacity(thread->work_queue)) { 215 item->func(item->context); 216 osi_free(item); 217 item = fixed_queue_try_dequeue(thread->work_queue); 218 ++count; 219 } 220 221 if (count > fixed_queue_capacity(thread->work_queue)) 222 LOG_DEBUG(LOG_TAG, "%s growing event queue on shutdown.", __func__); 223 224 LOG_WARN(LOG_TAG, "%s: thread id %d, thread name %s exited", __func__, thread->tid, thread->name); 225 return NULL; 226 } 227 228 static void work_queue_read_cb(void *context) { 229 assert(context != NULL); 230 231 fixed_queue_t *queue = (fixed_queue_t *)context; 232 work_item_t *item = fixed_queue_dequeue(queue); 233 item->func(item->context); 234 osi_free(item); 235 } 236