Home | History | Annotate | Download | only in server2
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import time
      6 import traceback
      7 
      8 from app_yaml_helper import AppYamlHelper
      9 from appengine_wrappers import IsDeadlineExceededError, logservice, taskqueue
     10 from branch_utility import BranchUtility
     11 from compiled_file_system import CompiledFileSystem
     12 from custom_logger import CustomLogger
     13 from data_source_registry import CreateDataSources
     14 from environment import GetAppVersion
     15 from gcs_file_system_provider import CloudStorageFileSystemProvider
     16 from github_file_system_provider import GithubFileSystemProvider
     17 from host_file_system_provider import HostFileSystemProvider
     18 from object_store_creator import ObjectStoreCreator
     19 from render_refresher import RenderRefresher
     20 from server_instance import ServerInstance
     21 from servlet import Servlet, Request, Response
     22 from timer import Timer
     23 
     24 
     25 _log = CustomLogger('cron')
     26 
     27 
     28 class CronServlet(Servlet):
     29   '''Servlet which runs a cron job.
     30   '''
     31   def __init__(self, request, delegate_for_test=None):
     32     Servlet.__init__(self, request)
     33     self._delegate = delegate_for_test or CronServlet.Delegate()
     34 
     35   class Delegate(object):
     36     '''CronServlet's runtime dependencies. Override for testing.
     37     '''
     38     def CreateBranchUtility(self, object_store_creator):
     39       return BranchUtility.Create(object_store_creator)
     40 
     41     def CreateHostFileSystemProvider(self,
     42                                      object_store_creator,
     43                                      pinned_commit=None):
     44       return HostFileSystemProvider(object_store_creator,
     45                                     pinned_commit=pinned_commit)
     46 
     47     def CreateGithubFileSystemProvider(self, object_store_creator):
     48       return GithubFileSystemProvider(object_store_creator)
     49 
     50     def CreateGCSFileSystemProvider(self, object_store_creator):
     51       return CloudStorageFileSystemProvider(object_store_creator)
     52 
     53     def GetAppVersion(self):
     54       return GetAppVersion()
     55 
     56   def Get(self):
     57     # Refreshes may time out, and if they do we need to make sure to flush the
     58     # logs before the process gets killed (Python gives us a couple of
     59     # seconds).
     60     #
     61     # So, manually flush logs at the end of the cron run. However, sometimes
     62     # even that isn't enough, which is why in this file we use _log and
     63     # make it flush the log every time its used.
     64     logservice.AUTOFLUSH_ENABLED = False
     65     try:
     66       return self._GetImpl()
     67     except BaseException:
     68       _log.error('Caught top-level exception! %s', traceback.format_exc())
     69     finally:
     70       logservice.flush()
     71 
     72   def _GetImpl(self):
     73     # Cron strategy:
     74     #
     75     # Collect all DataSources, the PlatformBundle, the ContentProviders, and
     76     # any other statically renderered contents (e.g. examples content),
     77     # and spin up taskqueue tasks which will refresh any cached data relevant
     78     # to these assets.
     79     #
     80     # TODO(rockot/kalman): At the moment examples are not actually refreshed
     81     # because they're too slow.
     82 
     83     _log.info('starting')
     84 
     85     server_instance = self._GetSafeServerInstance()
     86     master_fs = server_instance.host_file_system_provider.GetMaster()
     87     master_commit = master_fs.GetCommitID().Get()
     88 
     89     # This is the guy that would be responsible for refreshing the cache of
     90     # examples. Here for posterity, hopefully it will be added to the targets
     91     # below someday.
     92     render_refresher = RenderRefresher(server_instance, self._request)
     93 
     94     # Get the default taskqueue
     95     queue = taskqueue.Queue()
     96 
     97     # GAE documentation specifies that it's bad to add tasks to a queue
     98     # within one second of purging. We wait 2 seconds, because we like
     99     # to go the extra mile.
    100     queue.purge()
    101     time.sleep(2)
    102 
    103     success = True
    104     try:
    105       data_sources = CreateDataSources(server_instance)
    106       targets = (data_sources.items() +
    107                  [('content_providers', server_instance.content_providers),
    108                   ('platform_bundle', server_instance.platform_bundle)])
    109       title = 'initializing %s parallel targets' % len(targets)
    110       _log.info(title)
    111       timer = Timer()
    112       for name, target in targets:
    113         refresh_paths = target.GetRefreshPaths()
    114         for path in refresh_paths:
    115           queue.add(taskqueue.Task(url='/_refresh/%s/%s' % (name, path),
    116                                    params={'commit': master_commit}))
    117       _log.info('%s took %s' % (title, timer.Stop().FormatElapsed()))
    118     except:
    119       # This should never actually happen (each cron step does its own
    120       # conservative error checking), so re-raise no matter what it is.
    121       _log.error('uncaught error: %s' % traceback.format_exc())
    122       success = False
    123       raise
    124     finally:
    125       _log.info('finished (%s)', 'success' if success else 'FAILED')
    126       return (Response.Ok('Success') if success else
    127               Response.InternalError('Failure'))
    128 
    129   def _GetSafeServerInstance(self):
    130     '''Returns a ServerInstance with a host file system at a safe commit,
    131     meaning the last commit that the current running version of the server
    132     existed.
    133     '''
    134     delegate = self._delegate
    135 
    136     # IMPORTANT: Get a ServerInstance pinned to the most recent commit, not
    137     # HEAD. These cron jobs take a while and run very frequently such that
    138     # there is usually one running at any given time, and eventually a file
    139     # that we're dealing with will change underneath it, putting the server in
    140     # an undefined state.
    141     server_instance_near_head = self._CreateServerInstance(
    142         self._GetMostRecentCommit())
    143 
    144     app_yaml_handler = AppYamlHelper(
    145         server_instance_near_head.object_store_creator,
    146         server_instance_near_head.host_file_system_provider)
    147 
    148     if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()):
    149       return server_instance_near_head
    150 
    151     # The version in app.yaml is greater than the currently running app's.
    152     # The safe version is the one before it changed.
    153     safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan(
    154         delegate.GetAppVersion()) - 1
    155 
    156     _log.info('app version %s is out of date, safe is %s',
    157         delegate.GetAppVersion(), safe_revision)
    158 
    159     return self._CreateServerInstance(safe_revision)
    160 
    161   def _GetMostRecentCommit(self):
    162     '''Gets the commit of the most recent patch submitted to the host file
    163     system. This is similar to HEAD but it's a concrete commit so won't
    164     change as the cron runs.
    165     '''
    166     head_fs = (
    167         self._CreateServerInstance(None).host_file_system_provider.GetMaster())
    168     return head_fs.GetCommitID().Get()
    169 
    170   def _CreateServerInstance(self, commit):
    171     '''Creates a ServerInstance pinned to |commit|, or HEAD if None.
    172     NOTE: If passed None it's likely that during the cron run patches will be
    173     submitted at HEAD, which may change data underneath the cron run.
    174     '''
    175     object_store_creator = ObjectStoreCreator(start_empty=True)
    176     branch_utility = self._delegate.CreateBranchUtility(object_store_creator)
    177     host_file_system_provider = self._delegate.CreateHostFileSystemProvider(
    178         object_store_creator, pinned_commit=commit)
    179     github_file_system_provider = self._delegate.CreateGithubFileSystemProvider(
    180         object_store_creator)
    181     gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider(
    182         object_store_creator)
    183     return ServerInstance(object_store_creator,
    184                           CompiledFileSystem.Factory(object_store_creator),
    185                           branch_utility,
    186                           host_file_system_provider,
    187                           github_file_system_provider,
    188                           gcs_file_system_provider)
    189