Home | History | Annotate | Download | only in scheduling
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License
     15  */
     16 
     17 package com.android.voicemail.impl.scheduling;
     18 
     19 import android.annotation.TargetApi;
     20 import android.content.Context;
     21 import android.content.Intent;
     22 import android.os.Build.VERSION_CODES;
     23 import android.os.Bundle;
     24 import android.os.Handler;
     25 import android.os.HandlerThread;
     26 import android.os.Looper;
     27 import android.os.Message;
     28 import android.support.annotation.MainThread;
     29 import android.support.annotation.Nullable;
     30 import android.support.annotation.VisibleForTesting;
     31 import android.support.annotation.WorkerThread;
     32 import com.android.voicemail.impl.Assert;
     33 import com.android.voicemail.impl.NeededForTesting;
     34 import com.android.voicemail.impl.VvmLog;
     35 import com.android.voicemail.impl.scheduling.TaskQueue.NextTask;
     36 import java.util.List;
     37 
     38 /**
     39  * A singleton to queue and run {@link Task} with the {@link android.app.job.JobScheduler}. A task
     40  * is queued by sending a broadcast to {@link TaskReceiver}. The intent should contain enough
     41  * information in {@link Intent#getExtras()} to construct the task (see {@link
     42  * Tasks#createIntent(Context, Class)}).
     43  *
     44  * <p>The executor will only exist when {@link TaskSchedulerJobService} is running.
     45  *
     46  * <p>All tasks are ran in the background with a wakelock being held by the {@link
     47  * android.app.job.JobScheduler}, which is between {@link #onStartJob(Job, List)} and {@link
     48  * #finishJobAsync()}. The {@link TaskSchedulerJobService} also has a {@link TaskQueue}, but the
     49  * data is stored in the {@link android.app.job.JobScheduler} instead of the process memory, so if
     50  * the process is killed the queued tasks will be restored. If a new task is added, a new {@link
     51  * TaskSchedulerJobService} will be scheduled to run the task. If the job is already scheduled, the
     52  * new task will be pushed into the queue of the scheduled job. If the job is already running, the
     53  * job will be queued in process memory.
     54  *
     55  * <p>Only one task will be ran at a time, and same task cannot exist in the queue at the same time.
     56  * Refer to {@link TaskQueue} for queuing and execution order.
     57  *
     58  * <p>If there are still tasks in the queue but none are executable immediately, the service will
     59  * enter a "sleep", pushing all remaining task into a new job and end the current job.
     60  *
     61  * <p>The executor will be started when {@link TaskSchedulerJobService} is running, and stopped when
     62  * there are no more tasks in the queue or when the executor is put to sleep.
     63  *
     64  * <p>{@link android.app.job.JobScheduler} is not used directly due to:
     65  *
     66  * <ul>
     67  *   <li>The {@link android.telecom.PhoneAccountHandle} used to differentiate task can not be easily
     68  *       mapped into an integer for job id
     69  *   <li>A job cannot be mutated to store information such as retry count.
     70  * </ul>
     71  */
     72 @TargetApi(VERSION_CODES.O)
     73 final class TaskExecutor {
     74 
     75   /**
     76    * An entity that holds execution resources for the {@link TaskExecutor} to run, usually a {@link
     77    * android.app.job.JobService}.
     78    */
     79   interface Job {
     80 
     81     /**
     82      * Signals to Job to end and release its' resources. This is an asynchronous call and may not
     83      * take effect immediately.
     84      */
     85     @MainThread
     86     void finishAsync();
     87 
     88     /** Whether the call to {@link #finishAsync()} has actually taken effect. */
     89     @MainThread
     90     boolean isFinished();
     91   }
     92 
     93   private static final String TAG = "VvmTaskExecutor";
     94 
     95   private static final int READY_TOLERANCE_MILLISECONDS = 100;
     96 
     97   /**
     98    * Threshold to determine whether to do a short or long sleep when a task is scheduled in the
     99    * future.
    100    *
    101    * <p>A short sleep will continue the job and use {@link Handler#postDelayed(Runnable, long)} to
    102    * wait for the next task.
    103    *
    104    * <p>A long sleep will finish the job and schedule a new one. The exact execution time is
    105    * subjected to {@link android.app.job.JobScheduler} battery optimization, and is not exact.
    106    */
    107   private static final int SHORT_SLEEP_THRESHOLD_MILLISECONDS = 10_000;
    108   /**
    109    * When there are no more tasks to be run the service should be stopped. But when all tasks has
    110    * finished there might still be more tasks in the message queue waiting to be processed,
    111    * especially the ones submitted in {@link Task#onCompleted()}. Wait for a while before stopping
    112    * the service to make sure there are no pending messages.
    113    */
    114   private static final int STOP_DELAY_MILLISECONDS = 5_000;
    115 
    116   /** Interval between polling of whether the job is finished. */
    117   private static final int TERMINATE_POLLING_INTERVAL_MILLISECONDS = 1_000;
    118 
    119   // The thread to run tasks on
    120   private final WorkerThreadHandler workerThreadHandler;
    121 
    122   private static TaskExecutor instance;
    123 
    124   /**
    125    * Used by tests to turn task handling into a single threaded process by calling {@link
    126    * Handler#handleMessage(Message)} directly
    127    */
    128   private MessageSender messageSender = new MessageSender();
    129 
    130   private final MainThreadHandler mainThreadHandler;
    131 
    132   private final Context appContext;
    133 
    134   /** Main thread only, access through {@link #getTasks()} */
    135   private final TaskQueue tasks = new TaskQueue();
    136 
    137   private boolean isWorkerThreadBusy = false;
    138 
    139   private boolean isTerminating = false;
    140 
    141   private Job job;
    142 
    143   private final Runnable stopServiceWithDelay =
    144       new Runnable() {
    145         @MainThread
    146         @Override
    147         public void run() {
    148           VvmLog.i(TAG, "Stopping service");
    149           if (!isJobRunning() || isTerminating()) {
    150             VvmLog.e(TAG, "Service already stopped");
    151             return;
    152           }
    153           scheduleJobAndTerminate(0, true);
    154         }
    155       };
    156 
    157   /**
    158    * Reschedule the {@link TaskSchedulerJobService} and terminate the executor when the {@link Job}
    159    * is truly finished. If the job is still not finished, this runnable will requeue itself on the
    160    * main thread. The requeue is only expected to happen a few times.
    161    */
    162   private class JobFinishedPoller implements Runnable {
    163 
    164     private final long delayMillis;
    165     private final boolean isNewJob;
    166     private int invocationCounter = 0;
    167 
    168     JobFinishedPoller(long delayMillis, boolean isNewJob) {
    169       this.delayMillis = delayMillis;
    170       this.isNewJob = isNewJob;
    171     }
    172 
    173     @Override
    174     public void run() {
    175       // The job should be finished relatively quickly. Assert to make sure this assumption is true.
    176       Assert.isTrue(invocationCounter < 10);
    177       invocationCounter++;
    178       if (job.isFinished()) {
    179         VvmLog.i("JobFinishedPoller.run", "Job finished");
    180         if (!getTasks().isEmpty()) {
    181           TaskSchedulerJobService.scheduleJob(
    182               appContext, serializePendingTasks(), delayMillis, isNewJob);
    183           tasks.clear();
    184         }
    185         terminate();
    186         return;
    187       }
    188       VvmLog.w("JobFinishedPoller.run", "Job still running");
    189       mainThreadHandler.postDelayed(this, TERMINATE_POLLING_INTERVAL_MILLISECONDS);
    190     }
    191   };
    192 
    193   /** Should attempt to run the next task when a task has finished or been added. */
    194   private boolean taskAutoRunDisabledForTesting = false;
    195 
    196   /** Handles execution of the background task in teh worker thread. */
    197   @VisibleForTesting
    198   final class WorkerThreadHandler extends Handler {
    199 
    200     public WorkerThreadHandler(Looper looper) {
    201       super(looper);
    202     }
    203     @Override
    204     @WorkerThread
    205     public void handleMessage(Message msg) {
    206       Assert.isNotMainThread();
    207       Task task = (Task) msg.obj;
    208       try {
    209         VvmLog.i(TAG, "executing task " + task);
    210         task.onExecuteInBackgroundThread();
    211       } catch (Throwable throwable) {
    212         VvmLog.e(TAG, "Exception while executing task " + task + ":", throwable);
    213       }
    214 
    215       Message schedulerMessage = mainThreadHandler.obtainMessage();
    216       schedulerMessage.obj = task;
    217       messageSender.send(schedulerMessage);
    218     }
    219   }
    220 
    221   /** Handles completion of the background task in the main thread. */
    222   @VisibleForTesting
    223   final class MainThreadHandler extends Handler {
    224 
    225     public MainThreadHandler(Looper looper) {
    226       super(looper);
    227     }
    228 
    229     @Override
    230     @MainThread
    231     public void handleMessage(Message msg) {
    232       Assert.isMainThread();
    233       Task task = (Task) msg.obj;
    234       getTasks().remove(task);
    235       task.onCompleted();
    236       isWorkerThreadBusy = false;
    237       if (!isJobRunning() || isTerminating()) {
    238         // TaskExecutor was terminated when the task is running in background, don't need to run the
    239         // next task or terminate again
    240         return;
    241       }
    242       maybeRunNextTask();
    243     }
    244   }
    245 
    246   /** Starts a new TaskExecutor. May only be called by {@link TaskSchedulerJobService}. */
    247   @MainThread
    248   static void createRunningInstance(Context context) {
    249     Assert.isMainThread();
    250     Assert.isTrue(instance == null);
    251     instance = new TaskExecutor(context);
    252   }
    253 
    254   /** @return the currently running instance, or {@code null} if the executor is not running. */
    255   @MainThread
    256   @Nullable
    257   static TaskExecutor getRunningInstance() {
    258     return instance;
    259   }
    260 
    261   private TaskExecutor(Context context) {
    262     this.appContext = context.getApplicationContext();
    263     HandlerThread thread = new HandlerThread("VvmTaskExecutor");
    264     thread.start();
    265 
    266     workerThreadHandler = new WorkerThreadHandler(thread.getLooper());
    267     mainThreadHandler = new MainThreadHandler(Looper.getMainLooper());
    268   }
    269 
    270   @VisibleForTesting
    271   void terminate() {
    272     VvmLog.i(TAG, "terminated");
    273     Assert.isMainThread();
    274     job = null;
    275     workerThreadHandler.getLooper().quit();
    276     instance = null;
    277     TaskReceiver.resendDeferredBroadcasts(appContext);
    278   }
    279 
    280   @MainThread
    281   void addTask(Task task) {
    282     Assert.isMainThread();
    283     getTasks().add(task);
    284     VvmLog.i(TAG, task + " added");
    285     mainThreadHandler.removeCallbacks(stopServiceWithDelay);
    286     maybeRunNextTask();
    287   }
    288 
    289   @MainThread
    290   @VisibleForTesting
    291   TaskQueue getTasks() {
    292     Assert.isMainThread();
    293     return tasks;
    294   }
    295 
    296   @MainThread
    297   private void maybeRunNextTask() {
    298     Assert.isMainThread();
    299 
    300     if (isWorkerThreadBusy) {
    301       return;
    302     }
    303     if (taskAutoRunDisabledForTesting) {
    304       // If taskAutoRunDisabledForTesting is true, runNextTask() must be explicitly called
    305       // to run the next task.
    306       return;
    307     }
    308 
    309     runNextTask();
    310   }
    311 
    312   @VisibleForTesting
    313   @MainThread
    314   void runNextTask() {
    315     Assert.isMainThread();
    316     if (getTasks().isEmpty()) {
    317       prepareStop();
    318       return;
    319     }
    320     NextTask nextTask = getTasks().getNextTask(READY_TOLERANCE_MILLISECONDS);
    321 
    322     if (nextTask.task != null) {
    323       nextTask.task.onBeforeExecute();
    324       Message message = workerThreadHandler.obtainMessage();
    325       message.obj = nextTask.task;
    326       isWorkerThreadBusy = true;
    327       messageSender.send(message);
    328       return;
    329     }
    330     VvmLog.i(TAG, "minimal wait time:" + nextTask.minimalWaitTimeMillis);
    331     if (!taskAutoRunDisabledForTesting && nextTask.minimalWaitTimeMillis != null) {
    332       // No tasks are currently ready. Sleep until the next one should be.
    333       // If a new task is added during the sleep the service will wake immediately.
    334       sleep(nextTask.minimalWaitTimeMillis);
    335     }
    336   }
    337 
    338   @MainThread
    339   private void sleep(long timeMillis) {
    340     VvmLog.i(TAG, "sleep for " + timeMillis + " millis");
    341     if (timeMillis < SHORT_SLEEP_THRESHOLD_MILLISECONDS) {
    342       mainThreadHandler.postDelayed(
    343           new Runnable() {
    344             @Override
    345             public void run() {
    346               maybeRunNextTask();
    347             }
    348           },
    349           timeMillis);
    350       return;
    351     }
    352     scheduleJobAndTerminate(timeMillis, false);
    353   }
    354 
    355   private List<Bundle> serializePendingTasks() {
    356     return getTasks().toBundles();
    357   }
    358 
    359   private void prepareStop() {
    360     VvmLog.i(
    361         TAG,
    362         "no more tasks, stopping service if no task are added in "
    363             + STOP_DELAY_MILLISECONDS
    364             + " millis");
    365     mainThreadHandler.postDelayed(stopServiceWithDelay, STOP_DELAY_MILLISECONDS);
    366   }
    367 
    368   @NeededForTesting
    369   static class MessageSender {
    370 
    371     public void send(Message message) {
    372       message.sendToTarget();
    373     }
    374   }
    375 
    376   @NeededForTesting
    377   void setTaskAutoRunDisabledForTest(boolean value) {
    378     taskAutoRunDisabledForTesting = value;
    379   }
    380 
    381   @NeededForTesting
    382   void setMessageSenderForTest(MessageSender sender) {
    383     messageSender = sender;
    384   }
    385 
    386   /**
    387    * The {@link TaskSchedulerJobService} has started and all queued task should be executed in the
    388    * worker thread.
    389    */
    390   @MainThread
    391   public void onStartJob(Job job, List<Bundle> pendingTasks) {
    392     VvmLog.i(TAG, "onStartJob");
    393     this.job = job;
    394     tasks.fromBundles(appContext, pendingTasks);
    395     maybeRunNextTask();
    396   }
    397 
    398   /**
    399    * The {@link TaskSchedulerJobService} is being terminated by the system (timeout or network
    400    * lost). A new job will be queued to resume all pending tasks. The current unfinished job may be
    401    * ran again.
    402    */
    403   @MainThread
    404   public void onStopJob() {
    405     VvmLog.e(TAG, "onStopJob");
    406     if (isJobRunning() && !isTerminating()) {
    407       scheduleJobAndTerminate(0, true);
    408     }
    409   }
    410 
    411   /**
    412    * Send all pending tasks and schedule a new {@link TaskSchedulerJobService}. The current executor
    413    * will start the termination process, but restarted when the scheduled job runs in the future.
    414    *
    415    * @param delayMillis the delay before stating the job, see {@link
    416    *     android.app.job.JobInfo.Builder#setMinimumLatency(long)}. This must be 0 if {@code
    417    *     isNewJob} is true.
    418    * @param isNewJob a new job will be requested to run immediately, bypassing all requirements.
    419    */
    420   @MainThread
    421   @VisibleForTesting
    422   void scheduleJobAndTerminate(long delayMillis, boolean isNewJob) {
    423     Assert.isMainThread();
    424     finishJobAsync();
    425     mainThreadHandler.post(new JobFinishedPoller(delayMillis, isNewJob));
    426   }
    427 
    428   /**
    429    * Whether the TaskExecutor is still terminating. {@link TaskReceiver} should defer all new task
    430    * until {@link #getRunningInstance()} returns {@code null} so a new job can be started. {@link
    431    * #scheduleJobAndTerminate(long, boolean)} does not run immediately because the job can only be
    432    * scheduled after the main thread has returned. The TaskExecutor will be in a intermediate state
    433    * between scheduleJobAndTerminate() and terminate(). In this state, {@link #getRunningInstance()}
    434    * returns non-null because it has not been fully stopped yet, but the TaskExecutor cannot do
    435    * anything. A new job should not be scheduled either because the current job might still be
    436    * running.
    437    */
    438   @MainThread
    439   public boolean isTerminating() {
    440     return isTerminating;
    441   }
    442 
    443   /**
    444    * Signals {@link TaskSchedulerJobService} the current session of tasks has finished, and the wake
    445    * lock can be released. Note: this only takes effect after the main thread has been returned. If
    446    * a new job need to be scheduled, it should be posted on the main thread handler instead of
    447    * calling directly.
    448    */
    449   @MainThread
    450   private void finishJobAsync() {
    451     Assert.isTrue(!isTerminating());
    452     Assert.isMainThread();
    453     VvmLog.i(TAG, "finishing Job");
    454     job.finishAsync();
    455     isTerminating = true;
    456     mainThreadHandler.removeCallbacks(stopServiceWithDelay);
    457   }
    458 
    459   private boolean isJobRunning() {
    460     return job != null;
    461   }
    462 }
    463