1 /* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "base/time_utils.h" 18 #include "common_runtime_test.h" 19 #include "task_processor.h" 20 #include "thread_pool.h" 21 #include "thread-inl.h" 22 23 namespace art { 24 namespace gc { 25 26 class TaskProcessorTest : public CommonRuntimeTest { 27 public: 28 }; 29 30 class RecursiveTask : public HeapTask { 31 public: 32 RecursiveTask(TaskProcessor* task_processor, Atomic<size_t>* counter, size_t max_recursion) 33 : HeapTask(NanoTime() + MsToNs(10)), task_processor_(task_processor), counter_(counter), 34 max_recursion_(max_recursion) { 35 } 36 virtual void Run(Thread* self) OVERRIDE { 37 if (max_recursion_ > 0) { 38 task_processor_->AddTask(self, 39 new RecursiveTask(task_processor_, counter_, max_recursion_ - 1)); 40 counter_->FetchAndAddSequentiallyConsistent(1U); 41 } 42 } 43 44 private: 45 TaskProcessor* const task_processor_; 46 Atomic<size_t>* const counter_; 47 const size_t max_recursion_; 48 }; 49 50 class WorkUntilDoneTask : public SelfDeletingTask { 51 public: 52 WorkUntilDoneTask(TaskProcessor* task_processor, Atomic<bool>* done_running) 53 : task_processor_(task_processor), done_running_(done_running) { 54 } 55 virtual void Run(Thread* self) OVERRIDE { 56 task_processor_->RunAllTasks(self); 57 done_running_->StoreSequentiallyConsistent(true); 58 } 59 60 private: 61 TaskProcessor* const task_processor_; 62 Atomic<bool>* done_running_; 63 }; 64 65 TEST_F(TaskProcessorTest, Interrupt) { 66 ThreadPool thread_pool("task processor test", 1U); 67 Thread* const self = Thread::Current(); 68 TaskProcessor task_processor; 69 static constexpr size_t kRecursion = 10; 70 Atomic<bool> done_running(false); 71 Atomic<size_t> counter(0); 72 task_processor.AddTask(self, new RecursiveTask(&task_processor, &counter, kRecursion)); 73 task_processor.Start(self); 74 // Add a task which will wait until interrupted to the thread pool. 75 thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running)); 76 thread_pool.StartWorkers(self); 77 ASSERT_FALSE(done_running); 78 // Wait until all the tasks are done, but since we didn't interrupt, done_running should be 0. 79 while (counter.LoadSequentiallyConsistent() != kRecursion) { 80 usleep(10); 81 } 82 ASSERT_FALSE(done_running); 83 task_processor.Stop(self); 84 thread_pool.Wait(self, true, false); 85 // After the interrupt and wait, the WorkUntilInterruptedTasktask should have terminated and 86 // set done_running_ to true. 87 ASSERT_TRUE(done_running.LoadSequentiallyConsistent()); 88 89 // Test that we finish remaining tasks before returning from RunTasksUntilInterrupted. 90 counter.StoreSequentiallyConsistent(0); 91 done_running.StoreSequentiallyConsistent(false); 92 // Self interrupt before any of the other tasks run, but since we added them we should keep on 93 // working until all the tasks are completed. 94 task_processor.Stop(self); 95 task_processor.AddTask(self, new RecursiveTask(&task_processor, &counter, kRecursion)); 96 thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running)); 97 thread_pool.StartWorkers(self); 98 thread_pool.Wait(self, true, false); 99 ASSERT_TRUE(done_running.LoadSequentiallyConsistent()); 100 ASSERT_EQ(counter.LoadSequentiallyConsistent(), kRecursion); 101 } 102 103 class TestOrderTask : public HeapTask { 104 public: 105 TestOrderTask(uint64_t expected_time, size_t expected_counter, size_t* counter) 106 : HeapTask(expected_time), expected_counter_(expected_counter), counter_(counter) { 107 } 108 virtual void Run(Thread* thread ATTRIBUTE_UNUSED) OVERRIDE { 109 ASSERT_EQ(*counter_, expected_counter_); 110 ++*counter_; 111 } 112 113 private: 114 const size_t expected_counter_; 115 size_t* const counter_; 116 }; 117 118 TEST_F(TaskProcessorTest, Ordering) { 119 static const size_t kNumTasks = 25; 120 const uint64_t current_time = NanoTime(); 121 Thread* const self = Thread::Current(); 122 TaskProcessor task_processor; 123 task_processor.Stop(self); 124 size_t counter = 0; 125 std::vector<std::pair<uint64_t, size_t>> orderings; 126 for (size_t i = 0; i < kNumTasks; ++i) { 127 orderings.push_back(std::make_pair(current_time + MsToNs(10U * i), i)); 128 } 129 for (size_t i = 0; i < kNumTasks; ++i) { 130 std::swap(orderings[i], orderings[(i * 87654231 + 12345) % orderings.size()]); 131 } 132 for (const auto& pair : orderings) { 133 auto* task = new TestOrderTask(pair.first, pair.second, &counter); 134 task_processor.AddTask(self, task); 135 } 136 ThreadPool thread_pool("task processor test", 1U); 137 Atomic<bool> done_running(false); 138 // Add a task which will wait until interrupted to the thread pool. 139 thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running)); 140 ASSERT_FALSE(done_running.LoadSequentiallyConsistent()); 141 thread_pool.StartWorkers(self); 142 thread_pool.Wait(self, true, false); 143 ASSERT_TRUE(done_running.LoadSequentiallyConsistent()); 144 ASSERT_EQ(counter, kNumTasks); 145 } 146 147 } // namespace gc 148 } // namespace art 149