Home | History | Annotate | Download | only in src
      1 /******************************************************************************
      2  *
      3  *  Copyright (C) 2014 Google, Inc.
      4  *
      5  *  Licensed under the Apache License, Version 2.0 (the "License");
      6  *  you may not use this file except in compliance with the License.
      7  *  You may obtain a copy of the License at:
      8  *
      9  *  http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  *  Unless required by applicable law or agreed to in writing, software
     12  *  distributed under the License is distributed on an "AS IS" BASIS,
     13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  *  See the License for the specific language governing permissions and
     15  *  limitations under the License.
     16  *
     17  ******************************************************************************/
     18 
     19 #define LOG_TAG "osi_thread"
     20 
     21 #include <assert.h>
     22 #include <errno.h>
     23 #include <pthread.h>
     24 #include <string.h>
     25 #include <sys/prctl.h>
     26 #include <sys/types.h>
     27 #include <utils/Log.h>
     28 
     29 #include "fixed_queue.h"
     30 #include "reactor.h"
     31 #include "semaphore.h"
     32 #include "thread.h"
     33 
     34 struct thread_t {
     35   pthread_t pthread;
     36   pid_t tid;
     37   char name[THREAD_NAME_MAX + 1];
     38   reactor_t *reactor;
     39   fixed_queue_t *work_queue;
     40 };
     41 
     42 struct start_arg {
     43   thread_t *thread;
     44   semaphore_t *start_sem;
     45   int error;
     46 };
     47 
     48 typedef struct {
     49   thread_fn func;
     50   void *context;
     51 } work_item_t;
     52 
     53 static void *run_thread(void *start_arg);
     54 static void work_queue_read_cb(void *context);
     55 
     56 static const size_t WORK_QUEUE_CAPACITY = 128;
     57 
     58 thread_t *thread_new(const char *name) {
     59   assert(name != NULL);
     60 
     61   // Start is on the stack, but we use a semaphore, so it's safe
     62   thread_t *ret = calloc(1, sizeof(thread_t));
     63   if (!ret)
     64     goto error;
     65 
     66   ret->reactor = reactor_new();
     67   if (!ret->reactor)
     68     goto error;
     69 
     70   ret->work_queue = fixed_queue_new(WORK_QUEUE_CAPACITY);
     71   if (!ret->work_queue)
     72     goto error;
     73 
     74   struct start_arg start;
     75   start.start_sem = semaphore_new(0);
     76   if (!start.start_sem)
     77     goto error;
     78 
     79   strncpy(ret->name, name, THREAD_NAME_MAX);
     80   start.thread = ret;
     81   start.error = 0;
     82   pthread_create(&ret->pthread, NULL, run_thread, &start);
     83   semaphore_wait(start.start_sem);
     84   semaphore_free(start.start_sem);
     85   if (start.error)
     86     goto error;
     87   return ret;
     88 
     89 error:;
     90   if (ret) {
     91     fixed_queue_free(ret->work_queue, free);
     92     reactor_free(ret->reactor);
     93   }
     94   free(ret);
     95   return NULL;
     96 }
     97 
     98 void thread_free(thread_t *thread) {
     99   if (!thread)
    100     return;
    101 
    102   thread_stop(thread);
    103   pthread_join(thread->pthread, NULL);
    104   fixed_queue_free(thread->work_queue, free);
    105   reactor_free(thread->reactor);
    106   free(thread);
    107 }
    108 
    109 bool thread_post(thread_t *thread, thread_fn func, void *context) {
    110   assert(thread != NULL);
    111   assert(func != NULL);
    112 
    113   // TODO(sharvil): if the current thread == |thread| and we've run out
    114   // of queue space, we should abort this operation, otherwise we'll
    115   // deadlock.
    116 
    117   // Queue item is freed either when the queue itself is destroyed
    118   // or when the item is removed from the queue for dispatch.
    119   work_item_t *item = (work_item_t *)malloc(sizeof(work_item_t));
    120   if (!item) {
    121     ALOGE("%s unable to allocate memory: %s", __func__, strerror(errno));
    122     return false;
    123   }
    124   item->func = func;
    125   item->context = context;
    126   fixed_queue_enqueue(thread->work_queue, item);
    127   return true;
    128 }
    129 
    130 void thread_stop(thread_t *thread) {
    131   assert(thread != NULL);
    132   reactor_stop(thread->reactor);
    133 }
    134 
    135 const char *thread_name(const thread_t *thread) {
    136   assert(thread != NULL);
    137   return thread->name;
    138 }
    139 
    140 static void *run_thread(void *start_arg) {
    141   assert(start_arg != NULL);
    142 
    143   struct start_arg *start = start_arg;
    144   thread_t *thread = start->thread;
    145 
    146   assert(thread != NULL);
    147 
    148   if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) {
    149     ALOGE("%s unable to set thread name: %s", __func__, strerror(errno));
    150     start->error = errno;
    151     semaphore_post(start->start_sem);
    152     return NULL;
    153   }
    154   thread->tid = gettid();
    155 
    156   semaphore_post(start->start_sem);
    157 
    158   reactor_object_t work_queue_object;
    159   work_queue_object.context = thread->work_queue;
    160   work_queue_object.fd = fixed_queue_get_dequeue_fd(thread->work_queue);
    161   work_queue_object.interest = REACTOR_INTEREST_READ;
    162   work_queue_object.read_ready = work_queue_read_cb;
    163 
    164   reactor_register(thread->reactor, &work_queue_object);
    165   reactor_start(thread->reactor);
    166 
    167   // Make sure we dispatch all queued work items before exiting the thread.
    168   // This allows a caller to safely tear down by enqueuing a teardown
    169   // work item and then joining the thread.
    170   size_t count = 0;
    171   work_item_t *item = fixed_queue_try_dequeue(thread->work_queue);
    172   while (item && count <= WORK_QUEUE_CAPACITY) {
    173     item->func(item->context);
    174     free(item);
    175     item = fixed_queue_try_dequeue(thread->work_queue);
    176     ++count;
    177   }
    178 
    179   if (count > WORK_QUEUE_CAPACITY)
    180     ALOGD("%s growing event queue on shutdown.", __func__);
    181 
    182   return NULL;
    183 }
    184 
    185 static void work_queue_read_cb(void *context) {
    186   assert(context != NULL);
    187 
    188   fixed_queue_t *queue = (fixed_queue_t *)context;
    189   work_item_t *item = fixed_queue_dequeue(queue);
    190   item->func(item->context);
    191   free(item);
    192 }
    193