Home | History | Annotate | Download | only in tasks
      1 /*
      2  * Copyright (C) 2011 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package vogar.tasks;
     18 
     19 import java.util.ArrayList;
     20 import java.util.Collection;
     21 import java.util.Iterator;
     22 import java.util.LinkedList;
     23 import java.util.List;
     24 import java.util.concurrent.ExecutorService;
     25 import java.util.concurrent.TimeUnit;
     26 import vogar.Console;
     27 import vogar.Result;
     28 import vogar.util.Threads;
     29 
     30 /**
     31  * A set of tasks to execute.
     32  */
     33 public final class TaskQueue {
     34     private static final int FOREVER = 60 * 60 * 24 * 28; // four weeks
     35     private final Console console;
     36     private int runningTasks;
     37     private int runningActions;
     38     private int maxConcurrentActions;
     39     private final LinkedList<Task> tasks = new LinkedList<Task>();
     40     private final LinkedList<Task> runnableActions = new LinkedList<Task>();
     41     private final LinkedList<Task> runnableTasks = new LinkedList<Task>();
     42     private final List<Task> failedTasks = new ArrayList<Task>();
     43 
     44     public TaskQueue(Console console, int maxConcurrentActions) {
     45         this.console = console;
     46         this.maxConcurrentActions = maxConcurrentActions;
     47     }
     48 
     49     /**
     50      * Adds a task to the queue.
     51      */
     52     public synchronized void enqueue(Task task) {
     53         tasks.add(task);
     54     }
     55 
     56     public void enqueueAll(Collection<Task> tasks) {
     57         this.tasks.addAll(tasks);
     58     }
     59 
     60     public synchronized List<Task> getTasks() {
     61         return new ArrayList<Task>(tasks);
     62     }
     63 
     64     public void runTasks() {
     65         promoteBlockedTasks();
     66 
     67         ExecutorService runners = Threads.threadPerCpuExecutor(console, "TaskQueue");
     68         for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
     69             runners.execute(new Runnable() {
     70                 @Override public void run() {
     71                     while (runOneTask()) {
     72                     }
     73                 }
     74             });
     75         }
     76 
     77         runners.shutdown();
     78         try {
     79             runners.awaitTermination(FOREVER, TimeUnit.SECONDS);
     80         } catch (InterruptedException e) {
     81             throw new AssertionError();
     82         }
     83     }
     84 
     85     public void printTasks() {
     86         if (!console.isVerbose()) {
     87             return;
     88         }
     89 
     90         int i = 0;
     91         for (Task task : tasks) {
     92             StringBuilder message = new StringBuilder()
     93                     .append("Task ").append(i++).append(": ").append(task);
     94             for (Task blocker : task.tasksThatMustFinishFirst) {
     95                 message.append("\n  depends on completed task: ").append(blocker);
     96             }
     97             for (Task blocker : task.tasksThatMustFinishSuccessfullyFirst) {
     98                 message.append("\n  depends on successful task: ").append(blocker);
     99             }
    100             console.verbose(message.toString());
    101         }
    102     }
    103 
    104     public boolean hasFailedTasks() {
    105         return !failedTasks.isEmpty();
    106     }
    107 
    108     public void printProblemTasks() {
    109         for (Task task : failedTasks) {
    110             String message = "Failed task: " + task + " " + task.result;
    111             if (task.thrown != null) {
    112                 console.info(message, task.thrown);
    113             } else {
    114                 console.info(message);
    115             }
    116         }
    117         if (!console.isVerbose()) {
    118             return;
    119         }
    120         for (Task task : tasks) {
    121             StringBuilder message = new StringBuilder()
    122                     .append("Failed to execute task: ").append(task);
    123             for (Task blocker : task.tasksThatMustFinishFirst) {
    124                 if (blocker.result == null) {
    125                     message.append("\n  blocked by unexecuted task: ").append(blocker);
    126                 }
    127             }
    128             for (Task blocker : task.tasksThatMustFinishSuccessfullyFirst) {
    129                 if (blocker.result == null) {
    130                     message.append("\n  blocked by unexecuted task: ").append(blocker);
    131                 } else if (blocker.result != Result.SUCCESS) {
    132                     message.append("\n  blocked by unsuccessful task: ").append(blocker);
    133                 }
    134             }
    135             console.verbose(message.toString());
    136         }
    137     }
    138 
    139     private boolean runOneTask() {
    140         Task task = takeTask();
    141         if (task == null) {
    142             return false;
    143         }
    144         String threadName = Thread.currentThread().getName();
    145         Thread.currentThread().setName(task.toString());
    146         try {
    147             task.run(console);
    148         } finally {
    149             doneTask(task);
    150             Thread.currentThread().setName(threadName);
    151         }
    152         return true;
    153     }
    154 
    155     private synchronized Task takeTask() {
    156         while (true) {
    157             Task task = null;
    158             if (runningActions < maxConcurrentActions) {
    159                 task = runnableActions.poll();
    160             }
    161             if (task == null) {
    162                 task = runnableTasks.poll();
    163             }
    164 
    165             if (task != null) {
    166                 runningTasks++;
    167                 if (task.isAction()) {
    168                     runningActions++;
    169                 }
    170                 return task;
    171             }
    172 
    173             if (isExhausted()) {
    174                 return null;
    175             }
    176 
    177             try {
    178                 wait();
    179             } catch (InterruptedException e) {
    180                 throw new AssertionError();
    181             }
    182         }
    183     }
    184 
    185     private synchronized void doneTask(Task task) {
    186         if (task.result != Result.SUCCESS) {
    187             failedTasks.add(task);
    188         }
    189         runningTasks--;
    190         if (task.isAction()) {
    191             runningActions--;
    192         }
    193         promoteBlockedTasks();
    194         if (isExhausted()) {
    195             notifyAll();
    196         }
    197     }
    198 
    199     private synchronized void promoteBlockedTasks() {
    200         for (Iterator<Task> it = tasks.iterator(); it.hasNext(); ) {
    201             Task potentiallyUnblocked = it.next();
    202             if (potentiallyUnblocked.isRunnable()) {
    203                 it.remove();
    204                 if (potentiallyUnblocked.isAction()) {
    205                     runnableActions.add(potentiallyUnblocked);
    206                 } else {
    207                     runnableTasks.add(potentiallyUnblocked);
    208                 }
    209                 notifyAll();
    210             }
    211         }
    212     }
    213 
    214     /**
    215      * Returns true if there are no tasks to run and no tasks currently running.
    216      */
    217     private boolean isExhausted() {
    218         return runnableTasks.isEmpty() && runnableActions.isEmpty() && runningTasks == 0;
    219     }
    220 }
    221