Home | History | Annotate | Download | only in contrib
      1 """A simple script to backfill tko_task_references table with throttling."""
      2 
      3 from __future__ import absolute_import
      4 from __future__ import division
      5 from __future__ import print_function
      6 
      7 import argparse
      8 import collections
      9 import contextlib
     10 import logging
     11 import time
     12 
     13 import MySQLdb
     14 
     15 
     16 class BackfillException(Exception):
     17   pass
     18 
     19 
     20 def _parse_args():
     21   parser = argparse.ArgumentParser(
     22       description=__doc__)
     23   parser.add_argument('--host', required=True, help='mysql server host')
     24   parser.add_argument('--user', required=True, help='mysql server user')
     25   parser.add_argument('--password', required=True, help='mysql server password')
     26   parser.add_argument('--dryrun', action='store_true', default=False)
     27   parser.add_argument(
     28       '--num-iterations',
     29       default=None,
     30       type=int,
     31       help='If set, total number of iterations. Default is no limit.',
     32   )
     33   parser.add_argument(
     34       '--batch-size',
     35       default=1000,
     36       help='Number of tko_jobs rows to read in one iteration',
     37   )
     38   parser.add_argument(
     39       '--sleep-seconds',
     40       type=int,
     41       default=1,
     42       help='Time to sleep between iterations',
     43   )
     44 
     45   args = parser.parse_args()
     46   if args.dryrun:
     47     if not args.num_iterations:
     48       logging.info('DRYRUN: Limiting to 5 iterations in dryrun mode.')
     49       args.num_iterations = 5
     50   return args
     51 
     52 
     53 
     54 @contextlib.contextmanager
     55 def _mysql_connection(args):
     56   conn = MySQLdb.connect(user=args.user, host=args.host, passwd=args.password)
     57   with _mysql_cursor(conn) as c:
     58     c.execute('USE chromeos_autotest_db;')
     59   try:
     60     yield conn
     61   finally:
     62     conn.close()
     63 
     64 
     65 @contextlib.contextmanager
     66 def _autocommit(conn):
     67   try:
     68     yield conn
     69   except:
     70     conn.rollback()
     71   else:
     72     conn.commit()
     73 
     74 
     75 @contextlib.contextmanager
     76 def _mysql_cursor(conn):
     77   c = conn.cursor()
     78   try:
     79     yield c
     80   finally:
     81     c.close()
     82 
     83 
     84 def _latest_unfilled_job_idx(conn):
     85   with _mysql_cursor(conn) as c:
     86     c.execute("""
     87 SELECT tko_job_idx
     88 FROM tko_task_references
     89 ORDER BY tko_job_idx
     90 LIMIT 1
     91 ;""")
     92     r = c.fetchall()
     93     if r:
     94       return str(long(r[0][0]) - 1)
     95   logging.debug('tko_task_references is empty.'
     96                ' Grabbing the latest tko_job_idx to fill.')
     97   with _mysql_cursor(conn) as c:
     98     c.execute("""
     99 SELECT job_idx
    100 FROM tko_jobs
    101 ORDER BY job_idx DESC
    102 LIMIT 1
    103 ;""")
    104     r = c.fetchall()
    105     if r:
    106       return r[0][0]
    107   return None
    108 
    109 
    110 _TKOTaskReference = collections.namedtuple(
    111     '_TKOTaskReference',
    112     ['tko_job_idx', 'task_reference', 'parent_task_reference'],
    113 )
    114 
    115 _SQL_SELECT_TASK_REFERENCES = """
    116 SELECT job_idx, afe_job_id, afe_parent_job_id
    117 FROM tko_jobs
    118 WHERE job_idx <= %(latest_job_idx)s
    119 ORDER BY job_idx DESC
    120 LIMIT %(batch_size)s
    121 ;"""
    122 _SQL_INSERT_TASK_REFERENCES = """
    123 INSERT INTO tko_task_references(reference_type, tko_job_idx, task_id, parent_task_id)
    124 VALUES %(values)s
    125 ;"""
    126 _SQL_SELECT_TASK_REFERENCE = """
    127 SELECT tko_job_idx FROM tko_task_references WHERE tko_job_idx = %(tko_job_idx)s
    128 ;"""
    129 
    130 
    131 def _compute_task_references(conn, latest_job_idx, batch_size):
    132   with _mysql_cursor(conn) as c:
    133     sql = _SQL_SELECT_TASK_REFERENCES % {
    134         'latest_job_idx': latest_job_idx,
    135         'batch_size': batch_size,
    136     }
    137     c.execute(sql)
    138     rs = c.fetchall()
    139     if rs is None:
    140       return []
    141 
    142     return [_TKOTaskReference(r[0], r[1], r[2]) for r in rs]
    143 
    144 
    145 def _insert_task_references(conn, task_references, dryrun):
    146   values = ', '.join([
    147       '("afe", %s, "%s", "%s")' %
    148       (tr.tko_job_idx, tr.task_reference, tr.parent_task_reference)
    149       for tr in task_references
    150   ])
    151   sql = _SQL_INSERT_TASK_REFERENCES % {'values': values}
    152   if dryrun:
    153     if len(sql) < 200:
    154       sql_log = sql
    155     else:
    156       sql_log = '%s... [SNIP] ...%s' % (sql[:150], sql[-49:])
    157     logging.debug('Would have run: %s', sql_log)
    158   with _autocommit(conn) as conn:
    159     with _mysql_cursor(conn) as c:
    160       c.execute(sql)
    161 
    162 
    163 def _verify_task_references(conn, task_references):
    164   # Just verify that the last one was inserted.
    165   if not task_references:
    166     return
    167   tko_job_idx = task_references[-1].tko_job_idx
    168   sql = _SQL_SELECT_TASK_REFERENCE % {'tko_job_idx': tko_job_idx}
    169   with _mysql_cursor(conn) as c:
    170     c.execute(sql)
    171     r = c.fetchall()
    172     if not r or r[0][0] != tko_job_idx:
    173       raise BackfillException(
    174           'Failed to insert task reference for tko_job_id %s' % tko_job_idx)
    175 
    176 
    177 def _next_job_idx(task_references):
    178   return str(long(task_references[-1].tko_job_idx) - 1)
    179 
    180 def main():
    181   logging.basicConfig(level=logging.DEBUG)
    182   args = _parse_args()
    183   with _mysql_connection(args) as conn:
    184     tko_job_idx = _latest_unfilled_job_idx(conn)
    185     if tko_job_idx is None:
    186       raise BackfillException('Failed to get last unfilled tko_job_idx')
    187     logging.info('First tko_job_idx to fill: %s', tko_job_idx)
    188 
    189   while True:
    190     logging.info('####################################')
    191     logging.info('Start backfilling from tko_job_idx: %s', tko_job_idx)
    192 
    193     task_references = ()
    194     with _mysql_connection(args) as conn:
    195       task_references = _compute_task_references(
    196           conn, tko_job_idx, args.batch_size)
    197     if not task_references:
    198       logging.info('No more unfilled task references. All done!')
    199       break
    200 
    201     logging.info(
    202         'Inserting %d task references. tko_job_ids: %d...%d',
    203         len(task_references),
    204         task_references[0].tko_job_idx,
    205         task_references[-1].tko_job_idx,
    206     )
    207     with _mysql_connection(args) as conn:
    208       _insert_task_references(conn, task_references, args.dryrun)
    209     if not args.dryrun:
    210       with _mysql_connection(args) as conn:
    211         _verify_task_references(conn, task_references)
    212 
    213     tko_job_idx = _next_job_idx(task_references)
    214 
    215     if args.num_iterations is not None:
    216       args.num_iterations -= 1
    217       if args.num_iterations <= 0:
    218         break
    219       logging.info('%d more iterations left', args.num_iterations)
    220     logging.info('Iteration done. Sleeping for %d seconds', args.sleep_seconds)
    221     time.sleep(args.sleep_seconds)
    222 
    223 
    224 if __name__ == '__main__':
    225   main()
    226