1 #!/usr/bin/env python 2 3 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4 # Use of this source code is governed by a BSD-style license that can be 5 # found in the LICENSE file. 6 7 import glib 8 import logging 9 import random 10 11 DEFAULT_MAX_RANDOM_DELAY_MS = 10000 12 13 _instance = None 14 15 def get_instance(): 16 """ 17 Return the singleton instance of the TaskLoop class. 18 19 """ 20 global _instance 21 if _instance is None: 22 _instance = TaskLoop() 23 return _instance 24 25 26 class TaskLoop(object): 27 """ 28 The context to place asynchronous calls. 29 30 This is a wrapper around the GLIB mainloop interface, exposing methods to 31 place (delayed) asynchronous calls. In addition to wrapping around the GLIB 32 API, this provides switches to control how delays are incorporated in method 33 calls globally. 34 35 This class is meant to be a singleton. 36 Do not create an instance directly, use the module level function 37 get_instance() instead. 38 39 Running the TaskLoop is blocking for the caller. So use this class like so: 40 41 tl = task_loop.get_instance() 42 # Setup other things. 43 # Add initial tasks to tl to do stuff, post more tasks, and make the world a 44 # better place. 45 tl.start() 46 # This thread is now blocked. Some task should eventually call tl.stop() to 47 continue here. 48 49 @var ignore_delays: Flag to control if delayed tasks are posted immediately. 50 51 @var random_delays: Flag to control if arbitrary delays are inserted between 52 posted tasks. 53 54 @var max_random_delay_ms: When random_delays is True, the maximum delay 55 inserted between posted tasks. 56 57 """ 58 59 60 def __init__(self): 61 self._logger = logging.getLogger(__name__) 62 63 # Initialize properties 64 self._ignore_delays = False 65 self._random_delays = False 66 self._max_random_delay_ms = DEFAULT_MAX_RANDOM_DELAY_MS 67 68 # Get the mainloop so that tasks can be posted even before running the 69 # task loop. 70 self._mainloop = glib.MainLoop() 71 72 # Initialize dictionary to track posted tasks. 73 self._next_post_id = 0 74 self._posted_tasks = {} 75 76 77 @property 78 def ignore_delays(self): 79 """ 80 Boolean flag to control if delayed tasks are posted immediately. 81 82 If True, all tasks posted henceforth are immediately marked active 83 ignoring any delay requested. With this switch, all other delay related 84 switches are ignored. 85 86 """ 87 return self._ignore_delays 88 89 90 @ignore_delays.setter 91 def ignore_delays(self, value): 92 """ 93 Set |ignore_delays|. 94 95 @param value: Boolean value for the |ignore_delays| flag 96 97 """ 98 self._logger.debug('Turning %s delays ignored mode.', ('on' if value 99 else 'off')) 100 self._ignore_delays = value 101 102 103 @property 104 def random_delays(self): 105 """ 106 Boolean flag to control if random delays are inserted in posted tasks. 107 108 If True, arbitrary delays in range [0, |max_random_delay_ms|] are 109 inserted in all posted tasks henceforth, ignoring the actual delay 110 requested. 111 112 """ 113 return self._random_delays 114 115 116 @random_delays.setter 117 def random_delays(self, value): 118 """ 119 Set |random_delays|. 120 121 @param value: Boolean value for the random_delays flag. 122 123 """ 124 self._logger.debug('Turning %s random delays.', ('on' if value else 125 'off')) 126 self._random_delays = value 127 128 129 @property 130 def max_random_delay_ms(self): 131 """ 132 The maximum arbitrary delay inserted in posted tasks in milliseconds. 133 Type: int 134 135 """ 136 return self._max_random_delay_ms 137 138 139 @max_random_delay_ms.setter 140 def max_random_delay_ms(self, value): 141 """ 142 Set |max_random_delay_ms|. 143 144 @param value: Non-negative int value for |max_random_delay_ms|. Negative 145 values are clamped to 0. 146 147 """ 148 if value < 0: 149 self._logger.warning( 150 'Can not set max_random_delay_ms to negative value %s. ' 151 'Setting to 0 instead.', 152 value) 153 value = 0 154 self._logger.debug('Set max random delay to %d. Random delay is %s', 155 value, ('on' if self.random_delays else 'off')) 156 self._max_random_delay_ms = value 157 158 159 def start(self): 160 """ 161 Run the task loop. 162 163 This call is blocking. The thread that calls TaskLoop.start(...) becomes 164 the task loop itself and is blocked as such till TaskLoop.stop(...) is 165 called. 166 167 """ 168 self._logger.info('Task Loop is now processing tasks...') 169 self._mainloop.run() 170 171 172 def stop(self): 173 """ 174 Stop the task loop. 175 176 """ 177 self._logger.info('Task Loop quitting.') 178 self._mainloop.quit() 179 180 181 def post_repeated_task(self, callback, delay_ms=0): 182 """ 183 Post the given callback repeatedly forever until cancelled. 184 185 The posted callback must not expect any arguments. It likely does not 186 make sense to provide fixed data parameters to a repeated task. Use the 187 object reference to provide context. 188 189 In the |ignore_delays| mode, the task is reposted immediately after 190 dispatch. 191 In the |random_delays| mode, a new arbitrary delay is inserted before 192 each call to |callback|. 193 194 @param callback: The function to call repeatedly. |callback| must expect 195 an object reference as the only argument. The return value from 196 |callback| is ignored. 197 198 @param delay_ms: The delay between repeated calls to |callback|. The 199 first call is also delayed by this amount. Default: 0 200 201 @return: An integer ID that can be used to cancel the posted task. 202 203 """ 204 assert callback is not None 205 206 post_id = self._next_post_id 207 self._next_post_id += 1 208 209 next_delay_ms = self._next_delay_ms(delay_ms) 210 self._posted_tasks[post_id] = glib.timeout_add( 211 next_delay_ms, 212 TaskLoop._execute_repeated_task, 213 self, 214 post_id, 215 callback, 216 delay_ms) 217 return post_id 218 219 220 def post_task_after_delay(self, callback, delay_ms, *args, **kwargs): 221 """ 222 Post the given callback once to be dispatched after |delay_ms|. 223 224 @param callback: The function to call. The function may expect arbitrary 225 number of arguments, passed in as |*args| and |**kwargs|. The 226 return value from |callback| is ignored. 227 228 @param delay_ms: The delay before the call to |callback|. Default: 0 229 230 @return: An integer ID that can be used to cancel the posted task. 231 232 """ 233 assert callback is not None 234 post_id = self._next_post_id 235 self._next_post_id = self._next_post_id + 1 236 delay_ms = self._next_delay_ms(delay_ms) 237 self._posted_tasks[post_id] = glib.timeout_add(delay_ms, callback, 238 *args, **kwargs) 239 return post_id 240 241 242 def post_task(self, callback, *args, **kwargs): 243 """ 244 Post the given callback once. 245 246 In |random_delays| mode, this function is equivalent to 247 |post_task_after_delay|. 248 249 @param callback: The function to call. The function may expect arbitrary 250 number of arguments, passed in as |*args| and |**kwargs|. The 251 return value from |callback| is ignored. 252 253 @return: An integer ID that can be used to cancel the posted task. 254 255 """ 256 self._logger.debug('Task posted: %s', repr(callback)) 257 self._logger.debug('Arguments: %s, Keyword arguments: %s', 258 repr(args), repr(kwargs)) 259 return self.post_task_after_delay(callback, 0, *args, **kwargs) 260 261 262 def cancel_posted_task(self, post_id): 263 """ 264 Cancels a previously posted task that is yet to be dispatched. 265 266 @param post_id: The |post_id| of the task to cancel, as returned by one 267 of the functions that post a task. 268 269 @return: True if the posted task was removed. 270 271 """ 272 if post_id in self._posted_tasks: 273 retval = glib.source_remove(self._posted_tasks[post_id]) 274 if retval: 275 del self._posted_tasks[post_id] 276 return retval 277 else: 278 return False 279 280 281 def _next_delay_ms(self, user_delay_ms): 282 """ 283 Determine the actual delay to post the next task. 284 285 The actual delay posted may be different from the user requested delay 286 based on what mode we're in. 287 288 @param user_delay_ms: The delay requested by the user. 289 290 @return The actual delay to be posted. 291 292 """ 293 next_delay_ms = user_delay_ms 294 if self.ignore_delays: 295 next_delay_ms = 0 296 elif self.random_delays: 297 next_delay_ms = random.randint(0, self.max_random_delay_ms) 298 return next_delay_ms 299 300 301 def _execute_repeated_task(self, post_id, callback, delay_ms): 302 """ 303 A wrapper to repost an executed task, and return False. 304 305 We need this to be able to repost the task at arbitrary intervals. 306 307 @param post_id: The private post_id tracking this repeated task. 308 309 @param callback: The user callback that must be called. 310 311 @param delay_ms: The user requested delay between calls. 312 313 """ 314 retval = callback() 315 self._logger.debug('Ignored return value from repeated task: %s', 316 repr(retval)) 317 318 next_delay_ms = self._next_delay_ms(delay_ms) 319 self._posted_tasks[post_id] = glib.timeout_add( 320 next_delay_ms, 321 TaskLoop._execute_repeated_task, 322 self, 323 post_id, 324 callback, 325 delay_ms) 326 return False 327