1 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging 6 import traceback 7 8 from app_yaml_helper import AppYamlHelper 9 from appengine_wrappers import ( 10 GetAppVersion, IsDeadlineExceededError, logservice) 11 from branch_utility import BranchUtility 12 from compiled_file_system import CompiledFileSystem 13 from data_source_registry import CreateDataSources 14 from environment import IsDevServer 15 from extensions_paths import EXAMPLES, PUBLIC_TEMPLATES, STATIC_DOCS 16 from file_system_util import CreateURLsFromPaths 17 from future import Gettable, Future 18 from github_file_system_provider import GithubFileSystemProvider 19 from host_file_system_provider import HostFileSystemProvider 20 from object_store_creator import ObjectStoreCreator 21 from render_servlet import RenderServlet 22 from server_instance import ServerInstance 23 from servlet import Servlet, Request, Response 24 from timer import Timer, TimerClosure 25 26 27 class _SingletonRenderServletDelegate(RenderServlet.Delegate): 28 def __init__(self, server_instance): 29 self._server_instance = server_instance 30 31 def CreateServerInstance(self): 32 return self._server_instance 33 34 class _CronLogger(object): 35 '''Wraps the logging.* methods to prefix them with 'cron' and flush 36 immediately. The flushing is important because often these cron runs time 37 out and we lose the logs. 38 ''' 39 def info(self, msg, *args): self._log(logging.info, msg, args) 40 def warning(self, msg, *args): self._log(logging.warning, msg, args) 41 def error(self, msg, *args): self._log(logging.error, msg, args) 42 43 def _log(self, logfn, msg, args): 44 try: 45 logfn('cron: %s' % msg, *args) 46 finally: 47 logservice.flush() 48 49 _cronlog = _CronLogger() 50 51 def _RequestEachItem(title, items, request_callback): 52 '''Runs a task |request_callback| named |title| for each item in |items|. 53 |request_callback| must take an item and return a servlet response. 54 Returns true if every item was successfully run, false if any return a 55 non-200 response or raise an exception. 56 ''' 57 _cronlog.info('%s: starting', title) 58 success_count, failure_count = 0, 0 59 timer = Timer() 60 try: 61 for i, item in enumerate(items): 62 def error_message(detail): 63 return '%s: error rendering %s (%s of %s): %s' % ( 64 title, item, i + 1, len(items), detail) 65 try: 66 response = request_callback(item) 67 if response.status == 200: 68 success_count += 1 69 else: 70 _cronlog.error(error_message('response status %s' % response.status)) 71 failure_count += 1 72 except Exception as e: 73 _cronlog.error(error_message(traceback.format_exc())) 74 failure_count += 1 75 if IsDeadlineExceededError(e): raise 76 finally: 77 _cronlog.info('%s: rendered %s of %s with %s failures in %s', 78 title, success_count, len(items), failure_count, 79 timer.Stop().FormatElapsed()) 80 return success_count == len(items) 81 82 class CronServlet(Servlet): 83 '''Servlet which runs a cron job. 84 ''' 85 def __init__(self, request, delegate_for_test=None): 86 Servlet.__init__(self, request) 87 self._delegate = delegate_for_test or CronServlet.Delegate() 88 89 class Delegate(object): 90 '''CronServlet's runtime dependencies. Override for testing. 91 ''' 92 def CreateBranchUtility(self, object_store_creator): 93 return BranchUtility.Create(object_store_creator) 94 95 def CreateHostFileSystemProvider(self, 96 object_store_creator, 97 max_trunk_revision=None): 98 return HostFileSystemProvider(object_store_creator, 99 max_trunk_revision=max_trunk_revision) 100 101 def CreateGithubFileSystemProvider(self, object_store_creator): 102 return GithubFileSystemProvider(object_store_creator) 103 104 def GetAppVersion(self): 105 return GetAppVersion() 106 107 def Get(self): 108 # Crons often time out, and if they do we need to make sure to flush the 109 # logs before the process gets killed (Python gives us a couple of 110 # seconds). 111 # 112 # So, manually flush logs at the end of the cron run. However, sometimes 113 # even that isn't enough, which is why in this file we use _cronlog and 114 # make it flush the log every time its used. 115 logservice.AUTOFLUSH_ENABLED = False 116 try: 117 return self._GetImpl() 118 except BaseException: 119 _cronlog.error('Caught top-level exception! %s', traceback.format_exc()) 120 finally: 121 logservice.flush() 122 123 def _GetImpl(self): 124 # Cron strategy: 125 # 126 # Find all public template files and static files, and render them. Most of 127 # the time these won't have changed since the last cron run, so it's a 128 # little wasteful, but hopefully rendering is really fast (if it isn't we 129 # have a problem). 130 _cronlog.info('starting') 131 132 # This is returned every time RenderServlet wants to create a new 133 # ServerInstance. 134 # 135 # TODO(kalman): IMPORTANT. This sometimes throws an exception, breaking 136 # everything. Need retry logic at the fetcher level. 137 server_instance = self._GetSafeServerInstance() 138 trunk_fs = server_instance.host_file_system_provider.GetTrunk() 139 140 def render(path): 141 request = Request(path, self._request.host, self._request.headers) 142 delegate = _SingletonRenderServletDelegate(server_instance) 143 return RenderServlet(request, delegate).Get() 144 145 def request_files_in_dir(path, prefix=''): 146 '''Requests every file found under |path| in this host file system, with 147 a request prefix of |prefix|. 148 ''' 149 files = [name for name, _ in CreateURLsFromPaths(trunk_fs, path, prefix)] 150 return _RequestEachItem(path, files, render) 151 152 results = [] 153 154 try: 155 # Start running the hand-written Cron methods first; they can be run in 156 # parallel. They are resolved at the end. 157 def run_cron_for_future(target): 158 title = target.__class__.__name__ 159 future, init_timer = TimerClosure(target.Cron) 160 assert isinstance(future, Future), ( 161 '%s.Cron() did not return a Future' % title) 162 def resolve(): 163 resolve_timer = Timer() 164 try: 165 future.Get() 166 except Exception as e: 167 _cronlog.error('%s: error %s' % (title, traceback.format_exc())) 168 results.append(False) 169 if IsDeadlineExceededError(e): raise 170 finally: 171 resolve_timer.Stop() 172 _cronlog.info('%s took %s: %s to initialize and %s to resolve' % 173 (title, 174 init_timer.With(resolve_timer).FormatElapsed(), 175 init_timer.FormatElapsed(), 176 resolve_timer.FormatElapsed())) 177 return Future(delegate=Gettable(resolve)) 178 179 targets = (CreateDataSources(server_instance).values() + 180 [server_instance.content_providers]) 181 title = 'initializing %s parallel Cron targets' % len(targets) 182 _cronlog.info(title) 183 timer = Timer() 184 try: 185 cron_futures = [run_cron_for_future(target) for target in targets] 186 finally: 187 _cronlog.info('%s took %s' % (title, timer.Stop().FormatElapsed())) 188 189 # Rendering the public templates will also pull in all of the private 190 # templates. 191 results.append(request_files_in_dir(PUBLIC_TEMPLATES)) 192 193 # Rendering the public templates will have pulled in the .js and 194 # manifest.json files (for listing examples on the API reference pages), 195 # but there are still images, CSS, etc. 196 results.append(request_files_in_dir(STATIC_DOCS, prefix='static')) 197 198 # Samples are too expensive to run on the dev server, where there is no 199 # parallel fetch. 200 if not IsDevServer(): 201 # Fetch each individual sample file. 202 results.append(request_files_in_dir(EXAMPLES, 203 prefix='extensions/examples')) 204 205 # Fetch the zip file of each example (contains all the individual 206 # files). 207 example_zips = [] 208 for root, _, files in trunk_fs.Walk(EXAMPLES): 209 example_zips.extend( 210 root + '.zip' for name in files if name == 'manifest.json') 211 results.append(_RequestEachItem( 212 'example zips', 213 example_zips, 214 lambda path: render('extensions/examples/' + path))) 215 216 # Resolve the hand-written Cron method futures. 217 title = 'resolving %s parallel Cron targets' % len(targets) 218 _cronlog.info(title) 219 timer = Timer() 220 try: 221 for future in cron_futures: 222 future.Get() 223 finally: 224 _cronlog.info('%s took %s' % (title, timer.Stop().FormatElapsed())) 225 226 except: 227 results.append(False) 228 # This should never actually happen (each cron step does its own 229 # conservative error checking), so re-raise no matter what it is. 230 _cronlog.error('uncaught error: %s' % traceback.format_exc()) 231 raise 232 finally: 233 success = all(results) 234 _cronlog.info('finished (%s)', 'success' if success else 'FAILED') 235 return (Response.Ok('Success') if success else 236 Response.InternalError('Failure')) 237 238 def _GetSafeServerInstance(self): 239 '''Returns a ServerInstance with a host file system at a safe revision, 240 meaning the last revision that the current running version of the server 241 existed. 242 ''' 243 delegate = self._delegate 244 245 # IMPORTANT: Get a ServerInstance pinned to the most recent revision, not 246 # HEAD. These cron jobs take a while and run very frequently such that 247 # there is usually one running at any given time, and eventually a file 248 # that we're dealing with will change underneath it, putting the server in 249 # an undefined state. 250 server_instance_near_head = self._CreateServerInstance( 251 self._GetMostRecentRevision()) 252 253 app_yaml_handler = AppYamlHelper( 254 server_instance_near_head.object_store_creator, 255 server_instance_near_head.host_file_system_provider) 256 257 if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()): 258 return server_instance_near_head 259 260 # The version in app.yaml is greater than the currently running app's. 261 # The safe version is the one before it changed. 262 safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan( 263 delegate.GetAppVersion()) - 1 264 265 _cronlog.info('app version %s is out of date, safe is %s', 266 delegate.GetAppVersion(), safe_revision) 267 268 return self._CreateServerInstance(safe_revision) 269 270 def _GetMostRecentRevision(self): 271 '''Gets the revision of the most recent patch submitted to the host file 272 system. This is similar to HEAD but it's a concrete revision so won't 273 change as the cron runs. 274 ''' 275 head_fs = ( 276 self._CreateServerInstance(None).host_file_system_provider.GetTrunk()) 277 return head_fs.Stat('').version 278 279 def _CreateServerInstance(self, revision): 280 '''Creates a ServerInstance pinned to |revision|, or HEAD if None. 281 NOTE: If passed None it's likely that during the cron run patches will be 282 submitted at HEAD, which may change data underneath the cron run. 283 ''' 284 object_store_creator = ObjectStoreCreator(start_empty=True) 285 branch_utility = self._delegate.CreateBranchUtility(object_store_creator) 286 host_file_system_provider = self._delegate.CreateHostFileSystemProvider( 287 object_store_creator, max_trunk_revision=revision) 288 github_file_system_provider = self._delegate.CreateGithubFileSystemProvider( 289 object_store_creator) 290 return ServerInstance(object_store_creator, 291 CompiledFileSystem.Factory(object_store_creator), 292 branch_utility, 293 host_file_system_provider, 294 github_file_system_provider) 295