Home | History | Annotate | Download | only in tasks
      1 /*
      2  * Copyright (C) 2011 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package vogar.tasks;
     18 
     19 import java.util.ArrayList;
     20 import java.util.Collection;
     21 import java.util.Iterator;
     22 import java.util.LinkedList;
     23 import java.util.List;
     24 import java.util.concurrent.ExecutorService;
     25 import java.util.concurrent.TimeUnit;
     26 import vogar.Console;
     27 import vogar.Result;
     28 import vogar.util.Threads;
     29 
     30 /**
     31  * A set of tasks to execute.
     32  */
     33 public final class TaskQueue {
     34     private static final int FOREVER = 60 * 60 * 24 * 28; // four weeks
     35     private final Console console;
     36     private int runningTasks;
     37     private int runningActions;
     38     private int maxConcurrentActions;
     39     private final LinkedList<Task> tasks = new LinkedList<Task>();
     40     private final LinkedList<Task> runnableActions = new LinkedList<Task>();
     41     private final LinkedList<Task> runnableTasks = new LinkedList<Task>();
     42     private final List<Task> failedTasks = new ArrayList<Task>();
     43 
     44     public TaskQueue(Console console, int maxConcurrentActions) {
     45         this.console = console;
     46         this.maxConcurrentActions = maxConcurrentActions;
     47     }
     48 
     49     /**
     50      * Adds a task to the queue.
     51      */
     52     public synchronized void enqueue(Task task) {
     53         tasks.add(task);
     54     }
     55 
     56     public void enqueueAll(Collection<Task> tasks) {
     57         this.tasks.addAll(tasks);
     58     }
     59 
     60     public synchronized List<Task> getTasks() {
     61         return new ArrayList<Task>(tasks);
     62     }
     63 
     64     public void runTasks() {
     65         promoteBlockedTasks();
     66 
     67         ExecutorService runners = Threads.threadPerCpuExecutor(console, "TaskQueue");
     68         for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
     69             runners.execute(new Runnable() {
     70                 @Override public void run() {
     71                     while (runOneTask()) {
     72                     }
     73                 }
     74             });
     75         }
     76 
     77         runners.shutdown();
     78         try {
     79             runners.awaitTermination(FOREVER, TimeUnit.SECONDS);
     80         } catch (InterruptedException e) {
     81             throw new AssertionError();
     82         }
     83     }
     84 
     85     public void printTasks() {
     86         if (!console.isVerbose()) {
     87             return;
     88         }
     89 
     90         int i = 0;
     91         for (Task task : tasks) {
     92             StringBuilder message = new StringBuilder()
     93                     .append("Task ").append(i++).append(": ").append(task);
     94             for (Task blocker : task.tasksThatMustFinishFirst) {
     95                 message.append("\n  depends on completed task: ").append(blocker);
     96             }
     97             for (Task blocker : task.tasksThatMustFinishSuccessfullyFirst) {
     98                 message.append("\n  depends on successful task: ").append(blocker);
     99             }
    100             console.verbose(message.toString());
    101         }
    102     }
    103 
    104     public void printProblemTasks() {
    105         for (Task task : failedTasks) {
    106             String message = "Failed task: " + task + " " + task.result;
    107             if (task.thrown != null) {
    108                 console.info(message, task.thrown);
    109             } else {
    110                 console.info(message);
    111             }
    112         }
    113         if (!console.isVerbose()) {
    114             return;
    115         }
    116         for (Task task : tasks) {
    117             StringBuilder message = new StringBuilder()
    118                     .append("Failed to execute task: ").append(task);
    119             for (Task blocker : task.tasksThatMustFinishFirst) {
    120                 if (blocker.result == null) {
    121                     message.append("\n  blocked by unexecuted task: ").append(blocker);
    122                 }
    123             }
    124             for (Task blocker : task.tasksThatMustFinishSuccessfullyFirst) {
    125                 if (blocker.result == null) {
    126                     message.append("\n  blocked by unexecuted task: ").append(blocker);
    127                 } else if (blocker.result != Result.SUCCESS) {
    128                     message.append("\n  blocked by unsuccessful task: ").append(blocker);
    129                 }
    130             }
    131             console.verbose(message.toString());
    132         }
    133     }
    134 
    135     private boolean runOneTask() {
    136         Task task = takeTask();
    137         if (task == null) {
    138             return false;
    139         }
    140         String threadName = Thread.currentThread().getName();
    141         Thread.currentThread().setName(task.toString());
    142         try {
    143             task.run(console);
    144         } finally {
    145             doneTask(task);
    146             Thread.currentThread().setName(threadName);
    147         }
    148         return true;
    149     }
    150 
    151     private synchronized Task takeTask() {
    152         while (true) {
    153             Task task = null;
    154             if (runningActions < maxConcurrentActions) {
    155                 task = runnableActions.poll();
    156             }
    157             if (task == null) {
    158                 task = runnableTasks.poll();
    159             }
    160 
    161             if (task != null) {
    162                 runningTasks++;
    163                 if (task.isAction()) {
    164                     runningActions++;
    165                 }
    166                 return task;
    167             }
    168 
    169             if (isExhausted()) {
    170                 return null;
    171             }
    172 
    173             try {
    174                 wait();
    175             } catch (InterruptedException e) {
    176                 throw new AssertionError();
    177             }
    178         }
    179     }
    180 
    181     private synchronized void doneTask(Task task) {
    182         if (task.result != Result.SUCCESS) {
    183             failedTasks.add(task);
    184         }
    185         runningTasks--;
    186         if (task.isAction()) {
    187             runningActions--;
    188         }
    189         promoteBlockedTasks();
    190         if (isExhausted()) {
    191             notifyAll();
    192         }
    193     }
    194 
    195     private synchronized void promoteBlockedTasks() {
    196         for (Iterator<Task> it = tasks.iterator(); it.hasNext(); ) {
    197             Task potentiallyUnblocked = it.next();
    198             if (potentiallyUnblocked.isRunnable()) {
    199                 it.remove();
    200                 if (potentiallyUnblocked.isAction()) {
    201                     runnableActions.add(potentiallyUnblocked);
    202                 } else {
    203                     runnableTasks.add(potentiallyUnblocked);
    204                 }
    205                 notifyAll();
    206             }
    207         }
    208     }
    209 
    210     /**
    211      * Returns true if there are no tasks to run and no tasks currently running.
    212      */
    213     private boolean isExhausted() {
    214         return runnableTasks.isEmpty() && runnableActions.isEmpty() && runningTasks == 0;
    215     }
    216 }
    217