Home | History | Annotate | Download | only in server2
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import posixpath
      7 import traceback
      8 
      9 from app_yaml_helper import AppYamlHelper
     10 from appengine_wrappers import IsDeadlineExceededError, logservice
     11 from branch_utility import BranchUtility
     12 from compiled_file_system import CompiledFileSystem
     13 from data_source_registry import CreateDataSources
     14 from environment import GetAppVersion, IsDevServer
     15 from extensions_paths import EXAMPLES, PUBLIC_TEMPLATES, STATIC_DOCS
     16 from file_system_util import CreateURLsFromPaths
     17 from future import Future
     18 from gcs_file_system_provider import CloudStorageFileSystemProvider
     19 from github_file_system_provider import GithubFileSystemProvider
     20 from host_file_system_provider import HostFileSystemProvider
     21 from object_store_creator import ObjectStoreCreator
     22 from render_servlet import RenderServlet
     23 from server_instance import ServerInstance
     24 from servlet import Servlet, Request, Response
     25 from special_paths import SITE_VERIFICATION_FILE
     26 from timer import Timer, TimerClosure
     27 
     28 
     29 class _SingletonRenderServletDelegate(RenderServlet.Delegate):
     30   def __init__(self, server_instance):
     31     self._server_instance = server_instance
     32 
     33   def CreateServerInstance(self):
     34     return self._server_instance
     35 
     36 class _CronLogger(object):
     37   '''Wraps the logging.* methods to prefix them with 'cron' and flush
     38   immediately. The flushing is important because often these cron runs time
     39   out and we lose the logs.
     40   '''
     41   def info(self, msg, *args):    self._log(logging.info, msg, args)
     42   def warning(self, msg, *args): self._log(logging.warning, msg, args)
     43   def error(self, msg, *args):   self._log(logging.error, msg, args)
     44 
     45   def _log(self, logfn, msg, args):
     46     try:
     47       logfn('cron: %s' % msg, *args)
     48     finally:
     49       logservice.flush()
     50 
     51 _cronlog = _CronLogger()
     52 
     53 def _RequestEachItem(title, items, request_callback):
     54   '''Runs a task |request_callback| named |title| for each item in |items|.
     55   |request_callback| must take an item and return a servlet response.
     56   Returns true if every item was successfully run, false if any return a
     57   non-200 response or raise an exception.
     58   '''
     59   _cronlog.info('%s: starting', title)
     60   success_count, failure_count = 0, 0
     61   timer = Timer()
     62   try:
     63     for i, item in enumerate(items):
     64       def error_message(detail):
     65         return '%s: error rendering %s (%s of %s): %s' % (
     66             title, item, i + 1, len(items), detail)
     67       try:
     68         response = request_callback(item)
     69         if response.status == 200:
     70           success_count += 1
     71         else:
     72           _cronlog.error(error_message('response status %s' % response.status))
     73           failure_count += 1
     74       except Exception as e:
     75         _cronlog.error(error_message(traceback.format_exc()))
     76         failure_count += 1
     77         if IsDeadlineExceededError(e): raise
     78   finally:
     79     _cronlog.info('%s: rendered %s of %s with %s failures in %s',
     80         title, success_count, len(items), failure_count,
     81         timer.Stop().FormatElapsed())
     82   return success_count == len(items)
     83 
     84 class CronServlet(Servlet):
     85   '''Servlet which runs a cron job.
     86   '''
     87   def __init__(self, request, delegate_for_test=None):
     88     Servlet.__init__(self, request)
     89     self._delegate = delegate_for_test or CronServlet.Delegate()
     90 
     91   class Delegate(object):
     92     '''CronServlet's runtime dependencies. Override for testing.
     93     '''
     94     def CreateBranchUtility(self, object_store_creator):
     95       return BranchUtility.Create(object_store_creator)
     96 
     97     def CreateHostFileSystemProvider(self,
     98                                      object_store_creator,
     99                                      max_trunk_revision=None):
    100       return HostFileSystemProvider(object_store_creator,
    101                                     max_trunk_revision=max_trunk_revision)
    102 
    103     def CreateGithubFileSystemProvider(self, object_store_creator):
    104       return GithubFileSystemProvider(object_store_creator)
    105 
    106     def CreateGCSFileSystemProvider(self, object_store_creator):
    107       return CloudStorageFileSystemProvider(object_store_creator)
    108 
    109     def GetAppVersion(self):
    110       return GetAppVersion()
    111 
    112   def Get(self):
    113     # Crons often time out, and if they do we need to make sure to flush the
    114     # logs before the process gets killed (Python gives us a couple of
    115     # seconds).
    116     #
    117     # So, manually flush logs at the end of the cron run. However, sometimes
    118     # even that isn't enough, which is why in this file we use _cronlog and
    119     # make it flush the log every time its used.
    120     logservice.AUTOFLUSH_ENABLED = False
    121     try:
    122       return self._GetImpl()
    123     except BaseException:
    124       _cronlog.error('Caught top-level exception! %s', traceback.format_exc())
    125     finally:
    126       logservice.flush()
    127 
    128   def _GetImpl(self):
    129     # Cron strategy:
    130     #
    131     # Find all public template files and static files, and render them. Most of
    132     # the time these won't have changed since the last cron run, so it's a
    133     # little wasteful, but hopefully rendering is really fast (if it isn't we
    134     # have a problem).
    135     _cronlog.info('starting')
    136 
    137     # This is returned every time RenderServlet wants to create a new
    138     # ServerInstance.
    139     #
    140     # TODO(kalman): IMPORTANT. This sometimes throws an exception, breaking
    141     # everything. Need retry logic at the fetcher level.
    142     server_instance = self._GetSafeServerInstance()
    143     trunk_fs = server_instance.host_file_system_provider.GetTrunk()
    144 
    145     def render(path):
    146       request = Request(path, self._request.host, self._request.headers)
    147       delegate = _SingletonRenderServletDelegate(server_instance)
    148       return RenderServlet(request, delegate).Get()
    149 
    150     def request_files_in_dir(path, prefix='', strip_ext=None):
    151       '''Requests every file found under |path| in this host file system, with
    152       a request prefix of |prefix|. |strip_ext| is an optional list of file
    153       extensions that should be stripped from paths before requesting.
    154       '''
    155       def maybe_strip_ext(name):
    156         if name == SITE_VERIFICATION_FILE or not strip_ext:
    157           return name
    158         base, ext = posixpath.splitext(name)
    159         return base if ext in strip_ext else name
    160       files = [maybe_strip_ext(name)
    161                for name, _ in CreateURLsFromPaths(trunk_fs, path, prefix)]
    162       return _RequestEachItem(path, files, render)
    163 
    164     results = []
    165 
    166     try:
    167       # Start running the hand-written Cron methods first; they can be run in
    168       # parallel. They are resolved at the end.
    169       def run_cron_for_future(target):
    170         title = target.__class__.__name__
    171         future, init_timer = TimerClosure(target.Cron)
    172         assert isinstance(future, Future), (
    173             '%s.Cron() did not return a Future' % title)
    174         def resolve():
    175           resolve_timer = Timer()
    176           try:
    177             future.Get()
    178           except Exception as e:
    179             _cronlog.error('%s: error %s' % (title, traceback.format_exc()))
    180             results.append(False)
    181             if IsDeadlineExceededError(e): raise
    182           finally:
    183             resolve_timer.Stop()
    184             _cronlog.info('%s took %s: %s to initialize and %s to resolve' %
    185                 (title,
    186                  init_timer.With(resolve_timer).FormatElapsed(),
    187                  init_timer.FormatElapsed(),
    188                  resolve_timer.FormatElapsed()))
    189         return Future(callback=resolve)
    190 
    191       targets = (CreateDataSources(server_instance).values() +
    192                  [server_instance.content_providers,
    193                   server_instance.api_models])
    194       title = 'initializing %s parallel Cron targets' % len(targets)
    195       _cronlog.info(title)
    196       timer = Timer()
    197       try:
    198         cron_futures = [run_cron_for_future(target) for target in targets]
    199       finally:
    200         _cronlog.info('%s took %s' % (title, timer.Stop().FormatElapsed()))
    201 
    202       # Samples are too expensive to run on the dev server, where there is no
    203       # parallel fetch.
    204       #
    205       # XXX(kalman): Currently samples are *always* too expensive to fetch, so
    206       # disabling them for now. It won't break anything so long as we're still
    207       # not enforcing that everything gets cached for normal instances.
    208       if False:  # should be "not IsDevServer()":
    209         # Fetch each individual sample file.
    210         results.append(request_files_in_dir(EXAMPLES,
    211                                             prefix='extensions/examples'))
    212 
    213       # Resolve the hand-written Cron method futures.
    214       title = 'resolving %s parallel Cron targets' % len(targets)
    215       _cronlog.info(title)
    216       timer = Timer()
    217       try:
    218         for future in cron_futures:
    219           future.Get()
    220       finally:
    221         _cronlog.info('%s took %s' % (title, timer.Stop().FormatElapsed()))
    222 
    223     except:
    224       results.append(False)
    225       # This should never actually happen (each cron step does its own
    226       # conservative error checking), so re-raise no matter what it is.
    227       _cronlog.error('uncaught error: %s' % traceback.format_exc())
    228       raise
    229     finally:
    230       success = all(results)
    231       _cronlog.info('finished (%s)', 'success' if success else 'FAILED')
    232       return (Response.Ok('Success') if success else
    233               Response.InternalError('Failure'))
    234 
    235   def _GetSafeServerInstance(self):
    236     '''Returns a ServerInstance with a host file system at a safe revision,
    237     meaning the last revision that the current running version of the server
    238     existed.
    239     '''
    240     delegate = self._delegate
    241 
    242     # IMPORTANT: Get a ServerInstance pinned to the most recent revision, not
    243     # HEAD. These cron jobs take a while and run very frequently such that
    244     # there is usually one running at any given time, and eventually a file
    245     # that we're dealing with will change underneath it, putting the server in
    246     # an undefined state.
    247     server_instance_near_head = self._CreateServerInstance(
    248         self._GetMostRecentRevision())
    249 
    250     app_yaml_handler = AppYamlHelper(
    251         server_instance_near_head.object_store_creator,
    252         server_instance_near_head.host_file_system_provider)
    253 
    254     if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()):
    255       return server_instance_near_head
    256 
    257     # The version in app.yaml is greater than the currently running app's.
    258     # The safe version is the one before it changed.
    259     safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan(
    260         delegate.GetAppVersion()) - 1
    261 
    262     _cronlog.info('app version %s is out of date, safe is %s',
    263         delegate.GetAppVersion(), safe_revision)
    264 
    265     return self._CreateServerInstance(safe_revision)
    266 
    267   def _GetMostRecentRevision(self):
    268     '''Gets the revision of the most recent patch submitted to the host file
    269     system. This is similar to HEAD but it's a concrete revision so won't
    270     change as the cron runs.
    271     '''
    272     head_fs = (
    273         self._CreateServerInstance(None).host_file_system_provider.GetTrunk())
    274     return head_fs.Stat('').version
    275 
    276   def _CreateServerInstance(self, revision):
    277     '''Creates a ServerInstance pinned to |revision|, or HEAD if None.
    278     NOTE: If passed None it's likely that during the cron run patches will be
    279     submitted at HEAD, which may change data underneath the cron run.
    280     '''
    281     object_store_creator = ObjectStoreCreator(start_empty=True)
    282     branch_utility = self._delegate.CreateBranchUtility(object_store_creator)
    283     host_file_system_provider = self._delegate.CreateHostFileSystemProvider(
    284         object_store_creator, max_trunk_revision=revision)
    285     github_file_system_provider = self._delegate.CreateGithubFileSystemProvider(
    286         object_store_creator)
    287     gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider(
    288         object_store_creator)
    289     return ServerInstance(object_store_creator,
    290                           CompiledFileSystem.Factory(object_store_creator),
    291                           branch_utility,
    292                           host_file_system_provider,
    293                           github_file_system_provider,
    294                           gcs_file_system_provider)
    295