1 # Copyright 2013 The Chromium Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import time 6 import traceback 7 8 from app_yaml_helper import AppYamlHelper 9 from appengine_wrappers import IsDeadlineExceededError, logservice, taskqueue 10 from branch_utility import BranchUtility 11 from compiled_file_system import CompiledFileSystem 12 from custom_logger import CustomLogger 13 from data_source_registry import CreateDataSources 14 from environment import GetAppVersion 15 from gcs_file_system_provider import CloudStorageFileSystemProvider 16 from github_file_system_provider import GithubFileSystemProvider 17 from host_file_system_provider import HostFileSystemProvider 18 from object_store_creator import ObjectStoreCreator 19 from render_refresher import RenderRefresher 20 from server_instance import ServerInstance 21 from servlet import Servlet, Request, Response 22 from timer import Timer 23 24 25 _log = CustomLogger('cron') 26 27 28 class CronServlet(Servlet): 29 '''Servlet which runs a cron job. 30 ''' 31 def __init__(self, request, delegate_for_test=None): 32 Servlet.__init__(self, request) 33 self._delegate = delegate_for_test or CronServlet.Delegate() 34 35 class Delegate(object): 36 '''CronServlet's runtime dependencies. Override for testing. 37 ''' 38 def CreateBranchUtility(self, object_store_creator): 39 return BranchUtility.Create(object_store_creator) 40 41 def CreateHostFileSystemProvider(self, 42 object_store_creator, 43 pinned_commit=None): 44 return HostFileSystemProvider(object_store_creator, 45 pinned_commit=pinned_commit) 46 47 def CreateGithubFileSystemProvider(self, object_store_creator): 48 return GithubFileSystemProvider(object_store_creator) 49 50 def CreateGCSFileSystemProvider(self, object_store_creator): 51 return CloudStorageFileSystemProvider(object_store_creator) 52 53 def GetAppVersion(self): 54 return GetAppVersion() 55 56 def Get(self): 57 # Refreshes may time out, and if they do we need to make sure to flush the 58 # logs before the process gets killed (Python gives us a couple of 59 # seconds). 60 # 61 # So, manually flush logs at the end of the cron run. However, sometimes 62 # even that isn't enough, which is why in this file we use _log and 63 # make it flush the log every time its used. 64 logservice.AUTOFLUSH_ENABLED = False 65 try: 66 return self._GetImpl() 67 except BaseException: 68 _log.error('Caught top-level exception! %s', traceback.format_exc()) 69 finally: 70 logservice.flush() 71 72 def _GetImpl(self): 73 # Cron strategy: 74 # 75 # Collect all DataSources, the PlatformBundle, the ContentProviders, and 76 # any other statically renderered contents (e.g. examples content), 77 # and spin up taskqueue tasks which will refresh any cached data relevant 78 # to these assets. 79 # 80 # TODO(rockot/kalman): At the moment examples are not actually refreshed 81 # because they're too slow. 82 83 _log.info('starting') 84 85 server_instance = self._GetSafeServerInstance() 86 master_fs = server_instance.host_file_system_provider.GetMaster() 87 master_commit = master_fs.GetCommitID().Get() 88 89 # This is the guy that would be responsible for refreshing the cache of 90 # examples. Here for posterity, hopefully it will be added to the targets 91 # below someday. 92 render_refresher = RenderRefresher(server_instance, self._request) 93 94 # Get the default taskqueue 95 queue = taskqueue.Queue() 96 97 # GAE documentation specifies that it's bad to add tasks to a queue 98 # within one second of purging. We wait 2 seconds, because we like 99 # to go the extra mile. 100 queue.purge() 101 time.sleep(2) 102 103 success = True 104 try: 105 data_sources = CreateDataSources(server_instance) 106 targets = (data_sources.items() + 107 [('content_providers', server_instance.content_providers), 108 ('platform_bundle', server_instance.platform_bundle)]) 109 title = 'initializing %s parallel targets' % len(targets) 110 _log.info(title) 111 timer = Timer() 112 for name, target in targets: 113 refresh_paths = target.GetRefreshPaths() 114 for path in refresh_paths: 115 queue.add(taskqueue.Task(url='/_refresh/%s/%s' % (name, path), 116 params={'commit': master_commit})) 117 _log.info('%s took %s' % (title, timer.Stop().FormatElapsed())) 118 except: 119 # This should never actually happen (each cron step does its own 120 # conservative error checking), so re-raise no matter what it is. 121 _log.error('uncaught error: %s' % traceback.format_exc()) 122 success = False 123 raise 124 finally: 125 _log.info('finished (%s)', 'success' if success else 'FAILED') 126 return (Response.Ok('Success') if success else 127 Response.InternalError('Failure')) 128 129 def _GetSafeServerInstance(self): 130 '''Returns a ServerInstance with a host file system at a safe commit, 131 meaning the last commit that the current running version of the server 132 existed. 133 ''' 134 delegate = self._delegate 135 136 # IMPORTANT: Get a ServerInstance pinned to the most recent commit, not 137 # HEAD. These cron jobs take a while and run very frequently such that 138 # there is usually one running at any given time, and eventually a file 139 # that we're dealing with will change underneath it, putting the server in 140 # an undefined state. 141 server_instance_near_head = self._CreateServerInstance( 142 self._GetMostRecentCommit()) 143 144 app_yaml_handler = AppYamlHelper( 145 server_instance_near_head.object_store_creator, 146 server_instance_near_head.host_file_system_provider) 147 148 if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()): 149 return server_instance_near_head 150 151 # The version in app.yaml is greater than the currently running app's. 152 # The safe version is the one before it changed. 153 safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan( 154 delegate.GetAppVersion()) - 1 155 156 _log.info('app version %s is out of date, safe is %s', 157 delegate.GetAppVersion(), safe_revision) 158 159 return self._CreateServerInstance(safe_revision) 160 161 def _GetMostRecentCommit(self): 162 '''Gets the commit of the most recent patch submitted to the host file 163 system. This is similar to HEAD but it's a concrete commit so won't 164 change as the cron runs. 165 ''' 166 head_fs = ( 167 self._CreateServerInstance(None).host_file_system_provider.GetMaster()) 168 return head_fs.GetCommitID().Get() 169 170 def _CreateServerInstance(self, commit): 171 '''Creates a ServerInstance pinned to |commit|, or HEAD if None. 172 NOTE: If passed None it's likely that during the cron run patches will be 173 submitted at HEAD, which may change data underneath the cron run. 174 ''' 175 object_store_creator = ObjectStoreCreator(start_empty=True) 176 branch_utility = self._delegate.CreateBranchUtility(object_store_creator) 177 host_file_system_provider = self._delegate.CreateHostFileSystemProvider( 178 object_store_creator, pinned_commit=commit) 179 github_file_system_provider = self._delegate.CreateGithubFileSystemProvider( 180 object_store_creator) 181 gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider( 182 object_store_creator) 183 return ServerInstance(object_store_creator, 184 CompiledFileSystem.Factory(object_store_creator), 185 branch_utility, 186 host_file_system_provider, 187 github_file_system_provider, 188 gcs_file_system_provider) 189