Home | History | Annotate | Download | only in server2
      1 # Copyright 2013 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import traceback
      7 
      8 from app_yaml_helper import AppYamlHelper
      9 from appengine_wrappers import (
     10     GetAppVersion, IsDeadlineExceededError, logservice)
     11 from branch_utility import BranchUtility
     12 from compiled_file_system import CompiledFileSystem
     13 from data_source_registry import CreateDataSources
     14 from environment import IsDevServer
     15 from extensions_paths import EXAMPLES, PUBLIC_TEMPLATES, STATIC_DOCS
     16 from file_system_util import CreateURLsFromPaths
     17 from future import Gettable, Future
     18 from github_file_system_provider import GithubFileSystemProvider
     19 from host_file_system_provider import HostFileSystemProvider
     20 from object_store_creator import ObjectStoreCreator
     21 from render_servlet import RenderServlet
     22 from server_instance import ServerInstance
     23 from servlet import Servlet, Request, Response
     24 from timer import Timer, TimerClosure
     25 
     26 
     27 class _SingletonRenderServletDelegate(RenderServlet.Delegate):
     28   def __init__(self, server_instance):
     29     self._server_instance = server_instance
     30 
     31   def CreateServerInstance(self):
     32     return self._server_instance
     33 
     34 class _CronLogger(object):
     35   '''Wraps the logging.* methods to prefix them with 'cron' and flush
     36   immediately. The flushing is important because often these cron runs time
     37   out and we lose the logs.
     38   '''
     39   def info(self, msg, *args):    self._log(logging.info, msg, args)
     40   def warning(self, msg, *args): self._log(logging.warning, msg, args)
     41   def error(self, msg, *args):   self._log(logging.error, msg, args)
     42 
     43   def _log(self, logfn, msg, args):
     44     try:
     45       logfn('cron: %s' % msg, *args)
     46     finally:
     47       logservice.flush()
     48 
     49 _cronlog = _CronLogger()
     50 
     51 def _RequestEachItem(title, items, request_callback):
     52   '''Runs a task |request_callback| named |title| for each item in |items|.
     53   |request_callback| must take an item and return a servlet response.
     54   Returns true if every item was successfully run, false if any return a
     55   non-200 response or raise an exception.
     56   '''
     57   _cronlog.info('%s: starting', title)
     58   success_count, failure_count = 0, 0
     59   timer = Timer()
     60   try:
     61     for i, item in enumerate(items):
     62       def error_message(detail):
     63         return '%s: error rendering %s (%s of %s): %s' % (
     64             title, item, i + 1, len(items), detail)
     65       try:
     66         response = request_callback(item)
     67         if response.status == 200:
     68           success_count += 1
     69         else:
     70           _cronlog.error(error_message('response status %s' % response.status))
     71           failure_count += 1
     72       except Exception as e:
     73         _cronlog.error(error_message(traceback.format_exc()))
     74         failure_count += 1
     75         if IsDeadlineExceededError(e): raise
     76   finally:
     77     _cronlog.info('%s: rendered %s of %s with %s failures in %s',
     78         title, success_count, len(items), failure_count,
     79         timer.Stop().FormatElapsed())
     80   return success_count == len(items)
     81 
     82 class CronServlet(Servlet):
     83   '''Servlet which runs a cron job.
     84   '''
     85   def __init__(self, request, delegate_for_test=None):
     86     Servlet.__init__(self, request)
     87     self._delegate = delegate_for_test or CronServlet.Delegate()
     88 
     89   class Delegate(object):
     90     '''CronServlet's runtime dependencies. Override for testing.
     91     '''
     92     def CreateBranchUtility(self, object_store_creator):
     93       return BranchUtility.Create(object_store_creator)
     94 
     95     def CreateHostFileSystemProvider(self,
     96                                      object_store_creator,
     97                                      max_trunk_revision=None):
     98       return HostFileSystemProvider(object_store_creator,
     99                                     max_trunk_revision=max_trunk_revision)
    100 
    101     def CreateGithubFileSystemProvider(self, object_store_creator):
    102       return GithubFileSystemProvider(object_store_creator)
    103 
    104     def GetAppVersion(self):
    105       return GetAppVersion()
    106 
    107   def Get(self):
    108     # Crons often time out, and if they do we need to make sure to flush the
    109     # logs before the process gets killed (Python gives us a couple of
    110     # seconds).
    111     #
    112     # So, manually flush logs at the end of the cron run. However, sometimes
    113     # even that isn't enough, which is why in this file we use _cronlog and
    114     # make it flush the log every time its used.
    115     logservice.AUTOFLUSH_ENABLED = False
    116     try:
    117       return self._GetImpl()
    118     except BaseException:
    119       _cronlog.error('Caught top-level exception! %s', traceback.format_exc())
    120     finally:
    121       logservice.flush()
    122 
    123   def _GetImpl(self):
    124     # Cron strategy:
    125     #
    126     # Find all public template files and static files, and render them. Most of
    127     # the time these won't have changed since the last cron run, so it's a
    128     # little wasteful, but hopefully rendering is really fast (if it isn't we
    129     # have a problem).
    130     _cronlog.info('starting')
    131 
    132     # This is returned every time RenderServlet wants to create a new
    133     # ServerInstance.
    134     #
    135     # TODO(kalman): IMPORTANT. This sometimes throws an exception, breaking
    136     # everything. Need retry logic at the fetcher level.
    137     server_instance = self._GetSafeServerInstance()
    138     trunk_fs = server_instance.host_file_system_provider.GetTrunk()
    139 
    140     def render(path):
    141       request = Request(path, self._request.host, self._request.headers)
    142       delegate = _SingletonRenderServletDelegate(server_instance)
    143       return RenderServlet(request, delegate).Get()
    144 
    145     def request_files_in_dir(path, prefix=''):
    146       '''Requests every file found under |path| in this host file system, with
    147       a request prefix of |prefix|.
    148       '''
    149       files = [name for name, _ in CreateURLsFromPaths(trunk_fs, path, prefix)]
    150       return _RequestEachItem(path, files, render)
    151 
    152     results = []
    153 
    154     try:
    155       # Start running the hand-written Cron methods first; they can be run in
    156       # parallel. They are resolved at the end.
    157       def run_cron_for_future(target):
    158         title = target.__class__.__name__
    159         future, init_timer = TimerClosure(target.Cron)
    160         assert isinstance(future, Future), (
    161             '%s.Cron() did not return a Future' % title)
    162         def resolve():
    163           resolve_timer = Timer()
    164           try:
    165             future.Get()
    166           except Exception as e:
    167             _cronlog.error('%s: error %s' % (title, traceback.format_exc()))
    168             results.append(False)
    169             if IsDeadlineExceededError(e): raise
    170           finally:
    171             resolve_timer.Stop()
    172             _cronlog.info('%s took %s: %s to initialize and %s to resolve' %
    173                 (title,
    174                  init_timer.With(resolve_timer).FormatElapsed(),
    175                  init_timer.FormatElapsed(),
    176                  resolve_timer.FormatElapsed()))
    177         return Future(delegate=Gettable(resolve))
    178 
    179       targets = (CreateDataSources(server_instance).values() +
    180                  [server_instance.content_providers])
    181       title = 'initializing %s parallel Cron targets' % len(targets)
    182       _cronlog.info(title)
    183       timer = Timer()
    184       try:
    185         cron_futures = [run_cron_for_future(target) for target in targets]
    186       finally:
    187         _cronlog.info('%s took %s' % (title, timer.Stop().FormatElapsed()))
    188 
    189       # Rendering the public templates will also pull in all of the private
    190       # templates.
    191       results.append(request_files_in_dir(PUBLIC_TEMPLATES))
    192 
    193       # Rendering the public templates will have pulled in the .js and
    194       # manifest.json files (for listing examples on the API reference pages),
    195       # but there are still images, CSS, etc.
    196       results.append(request_files_in_dir(STATIC_DOCS, prefix='static'))
    197 
    198       # Samples are too expensive to run on the dev server, where there is no
    199       # parallel fetch.
    200       if not IsDevServer():
    201         # Fetch each individual sample file.
    202         results.append(request_files_in_dir(EXAMPLES,
    203                                             prefix='extensions/examples'))
    204 
    205         # Fetch the zip file of each example (contains all the individual
    206         # files).
    207         example_zips = []
    208         for root, _, files in trunk_fs.Walk(EXAMPLES):
    209           example_zips.extend(
    210               root + '.zip' for name in files if name == 'manifest.json')
    211         results.append(_RequestEachItem(
    212             'example zips',
    213             example_zips,
    214             lambda path: render('extensions/examples/' + path)))
    215 
    216       # Resolve the hand-written Cron method futures.
    217       title = 'resolving %s parallel Cron targets' % len(targets)
    218       _cronlog.info(title)
    219       timer = Timer()
    220       try:
    221         for future in cron_futures:
    222           future.Get()
    223       finally:
    224         _cronlog.info('%s took %s' % (title, timer.Stop().FormatElapsed()))
    225 
    226     except:
    227       results.append(False)
    228       # This should never actually happen (each cron step does its own
    229       # conservative error checking), so re-raise no matter what it is.
    230       _cronlog.error('uncaught error: %s' % traceback.format_exc())
    231       raise
    232     finally:
    233       success = all(results)
    234       _cronlog.info('finished (%s)', 'success' if success else 'FAILED')
    235       return (Response.Ok('Success') if success else
    236               Response.InternalError('Failure'))
    237 
    238   def _GetSafeServerInstance(self):
    239     '''Returns a ServerInstance with a host file system at a safe revision,
    240     meaning the last revision that the current running version of the server
    241     existed.
    242     '''
    243     delegate = self._delegate
    244 
    245     # IMPORTANT: Get a ServerInstance pinned to the most recent revision, not
    246     # HEAD. These cron jobs take a while and run very frequently such that
    247     # there is usually one running at any given time, and eventually a file
    248     # that we're dealing with will change underneath it, putting the server in
    249     # an undefined state.
    250     server_instance_near_head = self._CreateServerInstance(
    251         self._GetMostRecentRevision())
    252 
    253     app_yaml_handler = AppYamlHelper(
    254         server_instance_near_head.object_store_creator,
    255         server_instance_near_head.host_file_system_provider)
    256 
    257     if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()):
    258       return server_instance_near_head
    259 
    260     # The version in app.yaml is greater than the currently running app's.
    261     # The safe version is the one before it changed.
    262     safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan(
    263         delegate.GetAppVersion()) - 1
    264 
    265     _cronlog.info('app version %s is out of date, safe is %s',
    266         delegate.GetAppVersion(), safe_revision)
    267 
    268     return self._CreateServerInstance(safe_revision)
    269 
    270   def _GetMostRecentRevision(self):
    271     '''Gets the revision of the most recent patch submitted to the host file
    272     system. This is similar to HEAD but it's a concrete revision so won't
    273     change as the cron runs.
    274     '''
    275     head_fs = (
    276         self._CreateServerInstance(None).host_file_system_provider.GetTrunk())
    277     return head_fs.Stat('').version
    278 
    279   def _CreateServerInstance(self, revision):
    280     '''Creates a ServerInstance pinned to |revision|, or HEAD if None.
    281     NOTE: If passed None it's likely that during the cron run patches will be
    282     submitted at HEAD, which may change data underneath the cron run.
    283     '''
    284     object_store_creator = ObjectStoreCreator(start_empty=True)
    285     branch_utility = self._delegate.CreateBranchUtility(object_store_creator)
    286     host_file_system_provider = self._delegate.CreateHostFileSystemProvider(
    287         object_store_creator, max_trunk_revision=revision)
    288     github_file_system_provider = self._delegate.CreateGithubFileSystemProvider(
    289         object_store_creator)
    290     return ServerInstance(object_store_creator,
    291                           CompiledFileSystem.Factory(object_store_creator),
    292                           branch_utility,
    293                           host_file_system_provider,
    294                           github_file_system_provider)
    295