Home | History | Annotate | Download | only in encoder
      1 /*
      2  *  Copyright (c) 2017 The WebM project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include <assert.h>
     12 
     13 #include "vp9/encoder/vp9_encoder.h"
     14 #include "vp9/encoder/vp9_ethread.h"
     15 #include "vp9/encoder/vp9_multi_thread.h"
     16 
     17 void *vp9_enc_grp_get_next_job(MultiThreadHandle *multi_thread_ctxt,
     18                                int tile_id) {
     19   RowMTInfo *row_mt_info;
     20   JobQueueHandle *job_queue_hdl = NULL;
     21   void *next = NULL;
     22   JobNode *job_info = NULL;
     23 #if CONFIG_MULTITHREAD
     24   pthread_mutex_t *mutex_handle = NULL;
     25 #endif
     26 
     27   row_mt_info = (RowMTInfo *)(&multi_thread_ctxt->row_mt_info[tile_id]);
     28   job_queue_hdl = (JobQueueHandle *)&row_mt_info->job_queue_hdl;
     29 #if CONFIG_MULTITHREAD
     30   mutex_handle = &row_mt_info->job_mutex;
     31 #endif
     32 
     33 // lock the mutex for queue access
     34 #if CONFIG_MULTITHREAD
     35   pthread_mutex_lock(mutex_handle);
     36 #endif
     37   next = job_queue_hdl->next;
     38   if (NULL != next) {
     39     JobQueue *job_queue = (JobQueue *)next;
     40     job_info = &job_queue->job_info;
     41     // Update the next job in the queue
     42     job_queue_hdl->next = job_queue->next;
     43     job_queue_hdl->num_jobs_acquired++;
     44   }
     45 
     46 #if CONFIG_MULTITHREAD
     47   pthread_mutex_unlock(mutex_handle);
     48 #endif
     49 
     50   return job_info;
     51 }
     52 
     53 void vp9_row_mt_mem_alloc(VP9_COMP *cpi) {
     54   struct VP9Common *cm = &cpi->common;
     55   MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
     56   int tile_row, tile_col;
     57   const int tile_cols = 1 << cm->log2_tile_cols;
     58   const int tile_rows = 1 << cm->log2_tile_rows;
     59   const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
     60   int jobs_per_tile_col, total_jobs;
     61 
     62   jobs_per_tile_col = VPXMAX(cm->mb_rows, sb_rows);
     63   // Calculate the total number of jobs
     64   total_jobs = jobs_per_tile_col * tile_cols;
     65 
     66   multi_thread_ctxt->allocated_tile_cols = tile_cols;
     67   multi_thread_ctxt->allocated_tile_rows = tile_rows;
     68   multi_thread_ctxt->allocated_vert_unit_rows = jobs_per_tile_col;
     69 
     70   multi_thread_ctxt->job_queue =
     71       (JobQueue *)vpx_memalign(32, total_jobs * sizeof(JobQueue));
     72 
     73 #if CONFIG_MULTITHREAD
     74   // Create mutex for each tile
     75   for (tile_col = 0; tile_col < tile_cols; tile_col++) {
     76     RowMTInfo *row_mt_info = &multi_thread_ctxt->row_mt_info[tile_col];
     77     pthread_mutex_init(&row_mt_info->job_mutex, NULL);
     78   }
     79 #endif
     80 
     81   // Allocate memory for row based multi-threading
     82   for (tile_col = 0; tile_col < tile_cols; tile_col++) {
     83     TileDataEnc *this_tile = &cpi->tile_data[tile_col];
     84     vp9_row_mt_sync_mem_alloc(&this_tile->row_mt_sync, cm, jobs_per_tile_col);
     85     if (cpi->sf.adaptive_rd_thresh_row_mt) {
     86       const int sb_rows =
     87           (mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2) + 1;
     88       int i;
     89       this_tile->row_base_thresh_freq_fact =
     90           (int *)vpx_calloc(sb_rows * BLOCK_SIZES * MAX_MODES,
     91                             sizeof(*(this_tile->row_base_thresh_freq_fact)));
     92       for (i = 0; i < sb_rows * BLOCK_SIZES * MAX_MODES; i++)
     93         this_tile->row_base_thresh_freq_fact[i] = RD_THRESH_INIT_FACT;
     94     }
     95   }
     96 
     97   // Assign the sync pointer of tile row zero for every tile row > 0
     98   for (tile_row = 1; tile_row < tile_rows; tile_row++) {
     99     for (tile_col = 0; tile_col < tile_cols; tile_col++) {
    100       TileDataEnc *this_tile = &cpi->tile_data[tile_row * tile_cols + tile_col];
    101       TileDataEnc *this_col_tile = &cpi->tile_data[tile_col];
    102       this_tile->row_mt_sync = this_col_tile->row_mt_sync;
    103     }
    104   }
    105 
    106   // Calculate the number of vertical units in the given tile row
    107   for (tile_row = 0; tile_row < tile_rows; tile_row++) {
    108     TileDataEnc *this_tile = &cpi->tile_data[tile_row * tile_cols];
    109     TileInfo *tile_info = &this_tile->tile_info;
    110     multi_thread_ctxt->num_tile_vert_sbs[tile_row] =
    111         get_num_vert_units(*tile_info, MI_BLOCK_SIZE_LOG2);
    112   }
    113 }
    114 
    115 void vp9_row_mt_mem_dealloc(VP9_COMP *cpi) {
    116   MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
    117   int tile_col;
    118 #if CONFIG_MULTITHREAD
    119   int tile_row;
    120 #endif
    121 
    122   // Deallocate memory for job queue
    123   if (multi_thread_ctxt->job_queue) vpx_free(multi_thread_ctxt->job_queue);
    124 
    125 #if CONFIG_MULTITHREAD
    126   // Destroy mutex for each tile
    127   for (tile_col = 0; tile_col < multi_thread_ctxt->allocated_tile_cols;
    128        tile_col++) {
    129     RowMTInfo *row_mt_info = &multi_thread_ctxt->row_mt_info[tile_col];
    130     if (row_mt_info) pthread_mutex_destroy(&row_mt_info->job_mutex);
    131   }
    132 #endif
    133 
    134   // Free row based multi-threading sync memory
    135   for (tile_col = 0; tile_col < multi_thread_ctxt->allocated_tile_cols;
    136        tile_col++) {
    137     TileDataEnc *this_tile = &cpi->tile_data[tile_col];
    138     vp9_row_mt_sync_mem_dealloc(&this_tile->row_mt_sync);
    139   }
    140 
    141 #if CONFIG_MULTITHREAD
    142   for (tile_row = 0; tile_row < multi_thread_ctxt->allocated_tile_rows;
    143        tile_row++) {
    144     for (tile_col = 0; tile_col < multi_thread_ctxt->allocated_tile_cols;
    145          tile_col++) {
    146       TileDataEnc *this_tile =
    147           &cpi->tile_data[tile_row * multi_thread_ctxt->allocated_tile_cols +
    148                           tile_col];
    149       if (cpi->sf.adaptive_rd_thresh_row_mt) {
    150         if (this_tile->row_base_thresh_freq_fact != NULL) {
    151           vpx_free(this_tile->row_base_thresh_freq_fact);
    152           this_tile->row_base_thresh_freq_fact = NULL;
    153         }
    154       }
    155     }
    156   }
    157 #endif
    158 }
    159 
    160 void vp9_multi_thread_tile_init(VP9_COMP *cpi) {
    161   VP9_COMMON *const cm = &cpi->common;
    162   const int tile_cols = 1 << cm->log2_tile_cols;
    163   const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
    164   int i;
    165 
    166   for (i = 0; i < tile_cols; i++) {
    167     TileDataEnc *this_tile = &cpi->tile_data[i];
    168     int jobs_per_tile_col = cpi->oxcf.pass == 1 ? cm->mb_rows : sb_rows;
    169 
    170     // Initialize cur_col to -1 for all rows.
    171     memset(this_tile->row_mt_sync.cur_col, -1,
    172            sizeof(*this_tile->row_mt_sync.cur_col) * jobs_per_tile_col);
    173     vp9_zero(this_tile->fp_data);
    174     this_tile->fp_data.image_data_start_row = INVALID_ROW;
    175   }
    176 }
    177 
    178 void vp9_assign_tile_to_thread(MultiThreadHandle *multi_thread_ctxt,
    179                                int tile_cols, int num_workers) {
    180   int tile_id = 0;
    181   int i;
    182 
    183   // Allocating the threads for the tiles
    184   for (i = 0; i < num_workers; i++) {
    185     multi_thread_ctxt->thread_id_to_tile_id[i] = tile_id++;
    186     if (tile_id == tile_cols) tile_id = 0;
    187   }
    188 }
    189 
    190 int vp9_get_job_queue_status(MultiThreadHandle *multi_thread_ctxt,
    191                              int cur_tile_id) {
    192   RowMTInfo *row_mt_info;
    193   JobQueueHandle *job_queue_hndl;
    194 #if CONFIG_MULTITHREAD
    195   pthread_mutex_t *mutex;
    196 #endif
    197   int num_jobs_remaining;
    198 
    199   row_mt_info = &multi_thread_ctxt->row_mt_info[cur_tile_id];
    200   job_queue_hndl = &row_mt_info->job_queue_hdl;
    201 #if CONFIG_MULTITHREAD
    202   mutex = &row_mt_info->job_mutex;
    203 #endif
    204 
    205 #if CONFIG_MULTITHREAD
    206   pthread_mutex_lock(mutex);
    207 #endif
    208   num_jobs_remaining =
    209       multi_thread_ctxt->jobs_per_tile_col - job_queue_hndl->num_jobs_acquired;
    210 #if CONFIG_MULTITHREAD
    211   pthread_mutex_unlock(mutex);
    212 #endif
    213 
    214   return (num_jobs_remaining);
    215 }
    216 
    217 void vp9_prepare_job_queue(VP9_COMP *cpi, JOB_TYPE job_type) {
    218   VP9_COMMON *const cm = &cpi->common;
    219   MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
    220   JobQueue *job_queue = multi_thread_ctxt->job_queue;
    221   const int tile_cols = 1 << cm->log2_tile_cols;
    222   int job_row_num, jobs_per_tile, jobs_per_tile_col, total_jobs;
    223   const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
    224   int tile_col, i;
    225 
    226   jobs_per_tile_col = (job_type != ENCODE_JOB) ? cm->mb_rows : sb_rows;
    227   total_jobs = jobs_per_tile_col * tile_cols;
    228 
    229   multi_thread_ctxt->jobs_per_tile_col = jobs_per_tile_col;
    230   // memset the entire job queue buffer to zero
    231   memset(job_queue, 0, total_jobs * sizeof(JobQueue));
    232 
    233   // Job queue preparation
    234   for (tile_col = 0; tile_col < tile_cols; tile_col++) {
    235     RowMTInfo *tile_ctxt = &multi_thread_ctxt->row_mt_info[tile_col];
    236     JobQueue *job_queue_curr, *job_queue_temp;
    237     int tile_row = 0;
    238 
    239     tile_ctxt->job_queue_hdl.next = (void *)job_queue;
    240     tile_ctxt->job_queue_hdl.num_jobs_acquired = 0;
    241 
    242     job_queue_curr = job_queue;
    243     job_queue_temp = job_queue;
    244 
    245     // loop over all the vertical rows
    246     for (job_row_num = 0, jobs_per_tile = 0; job_row_num < jobs_per_tile_col;
    247          job_row_num++, jobs_per_tile++) {
    248       job_queue_curr->job_info.vert_unit_row_num = job_row_num;
    249       job_queue_curr->job_info.tile_col_id = tile_col;
    250       job_queue_curr->job_info.tile_row_id = tile_row;
    251       job_queue_curr->next = (void *)(job_queue_temp + 1);
    252       job_queue_curr = ++job_queue_temp;
    253 
    254       if (ENCODE_JOB == job_type) {
    255         if (jobs_per_tile >=
    256             multi_thread_ctxt->num_tile_vert_sbs[tile_row] - 1) {
    257           tile_row++;
    258           jobs_per_tile = -1;
    259         }
    260       }
    261     }
    262 
    263     // Set the last pointer to NULL
    264     job_queue_curr += -1;
    265     job_queue_curr->next = (void *)NULL;
    266 
    267     // Move to the next tile
    268     job_queue += jobs_per_tile_col;
    269   }
    270 
    271   for (i = 0; i < cpi->num_workers; i++) {
    272     EncWorkerData *thread_data;
    273     thread_data = &cpi->tile_thr_data[i];
    274     thread_data->thread_id = i;
    275 
    276     for (tile_col = 0; tile_col < tile_cols; tile_col++)
    277       thread_data->tile_completion_status[tile_col] = 0;
    278   }
    279 }
    280 
    281 int vp9_get_tiles_proc_status(MultiThreadHandle *multi_thread_ctxt,
    282                               int *tile_completion_status, int *cur_tile_id,
    283                               int tile_cols) {
    284   int tile_col;
    285   int tile_id = -1;  // Stores the tile ID with minimum proc done
    286   int max_num_jobs_remaining = 0;
    287   int num_jobs_remaining;
    288 
    289   // Mark the completion to avoid check in the loop
    290   tile_completion_status[*cur_tile_id] = 1;
    291   // Check for the status of all the tiles
    292   for (tile_col = 0; tile_col < tile_cols; tile_col++) {
    293     if (tile_completion_status[tile_col] == 0) {
    294       num_jobs_remaining =
    295           vp9_get_job_queue_status(multi_thread_ctxt, tile_col);
    296       // Mark the completion to avoid checks during future switches across tiles
    297       if (num_jobs_remaining == 0) tile_completion_status[tile_col] = 1;
    298       if (num_jobs_remaining > max_num_jobs_remaining) {
    299         max_num_jobs_remaining = num_jobs_remaining;
    300         tile_id = tile_col;
    301       }
    302     }
    303   }
    304 
    305   if (-1 == tile_id) {
    306     return 1;
    307   } else {
    308     // Update the cur ID to the next tile ID that will be processed,
    309     // which will be the least processed tile
    310     *cur_tile_id = tile_id;
    311     return 0;
    312   }
    313 }
    314