Home | History | Annotate | Download | only in libfec
      1 /*
      2  * Copyright (C) 2015 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "fec_private.h"
     18 
     19 struct process_info {
     20     int id;
     21     fec_handle *f;
     22     uint8_t *buf;
     23     size_t count;
     24     uint64_t offset;
     25     read_func func;
     26     ssize_t rc;
     27     size_t errors;
     28 };
     29 
     30 /* thread function  */
     31 static void * __process(void *cookie)
     32 {
     33     process_info *p = static_cast<process_info *>(cookie);
     34 
     35     debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset,
     36         p->offset + p->count);
     37 
     38     p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors);
     39     return p;
     40 }
     41 
     42 /* launches a maximum number of threads to process a read */
     43 ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset,
     44         read_func func)
     45 {
     46     check(f);
     47     check(buf)
     48     check(func);
     49 
     50     if (count == 0) {
     51         return 0;
     52     }
     53 
     54     int threads = sysconf(_SC_NPROCESSORS_ONLN);
     55 
     56     if (threads < WORK_MIN_THREADS) {
     57         threads = WORK_MIN_THREADS;
     58     } else if (threads > WORK_MAX_THREADS) {
     59         threads = WORK_MAX_THREADS;
     60     }
     61 
     62     uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE;
     63     size_t blocks = fec_div_round_up(count, FEC_BLOCKSIZE);
     64 
     65     size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE;
     66     size_t max_threads = fec_div_round_up(count, count_per_thread);
     67 
     68     if ((size_t)threads > max_threads) {
     69         threads = (int)max_threads;
     70     }
     71 
     72     size_t left = count;
     73     uint64_t pos = offset;
     74     uint64_t end = start + count_per_thread;
     75 
     76     debug("%d threads, %zu bytes per thread (total %zu)", threads,
     77         count_per_thread, count);
     78 
     79     std::vector<pthread_t> handles;
     80     process_info info[threads];
     81     ssize_t rc = 0;
     82 
     83     /* start threads to process queue */
     84     for (int i = 0; i < threads; ++i) {
     85         check(left > 0);
     86 
     87         info[i].id = i;
     88         info[i].f = f;
     89         info[i].buf = &buf[pos - offset];
     90         info[i].count = (size_t)(end - pos);
     91         info[i].offset = pos;
     92         info[i].func = func;
     93         info[i].rc = -1;
     94         info[i].errors = 0;
     95 
     96         if (info[i].count > left) {
     97             info[i].count = left;
     98         }
     99 
    100         pthread_t thread;
    101 
    102         if (pthread_create(&thread, NULL, __process, &info[i]) != 0) {
    103             error("failed to create thread: %s", strerror(errno));
    104             rc = -1;
    105         } else {
    106             handles.push_back(thread);
    107         }
    108 
    109         pos = end;
    110         end  += count_per_thread;
    111         left -= info[i].count;
    112     }
    113 
    114     check(left == 0);
    115 
    116     ssize_t nread = 0;
    117 
    118     /* wait for all threads to complete */
    119     for (auto thread : handles) {
    120         process_info *p = NULL;
    121 
    122         if (pthread_join(thread, (void **)&p) != 0) {
    123             error("failed to join thread: %s", strerror(errno));
    124             rc = -1;
    125         } else if (!p || p->rc == -1) {
    126             rc = -1;
    127         } else {
    128             nread += p->rc;
    129             f->errors += p->errors;
    130         }
    131     }
    132 
    133     if (rc == -1) {
    134         errno = EIO;
    135         return -1;
    136     }
    137 
    138     return nread;
    139 }
    140