Home | History | Annotate | Download | only in services
      1 #!/usr/bin/env python
      2 # Copyright (c) 2006-2008 Mitch Garnaat http://garnaat.org/
      3 #
      4 # Permission is hereby granted, free of charge, to any person obtaining a
      5 # copy of this software and associated documentation files (the
      6 # "Software"), to deal in the Software without restriction, including
      7 # without limitation the rights to use, copy, modify, merge, publish, dis-
      8 # tribute, sublicense, and/or sell copies of the Software, and to permit
      9 # persons to whom the Software is furnished to do so, subject to the fol-
     10 # lowing conditions:
     11 #
     12 # The above copyright notice and this permission notice shall be included
     13 # in all copies or substantial portions of the Software.
     14 #
     15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
     16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
     17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
     18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
     19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     21 # IN THE SOFTWARE.
     22 from optparse import OptionParser
     23 from boto.services.servicedef import ServiceDef
     24 from boto.services.submit import Submitter
     25 from boto.services.result import ResultProcessor
     26 import boto
     27 import sys, os
     28 from boto.compat import StringIO
     29 
     30 class BS(object):
     31 
     32     Usage = "usage: %prog [options] config_file command"
     33 
     34     Commands = {'reset' : 'Clear input queue and output bucket',
     35                 'submit' : 'Submit local files to the service',
     36                 'start' : 'Start the service',
     37                 'status' : 'Report on the status of the service buckets and queues',
     38                 'retrieve' : 'Retrieve output generated by a batch',
     39                 'batches' : 'List all batches stored in current output_domain'}
     40     
     41     def __init__(self):
     42         self.service_name = None
     43         self.parser = OptionParser(usage=self.Usage)
     44         self.parser.add_option("--help-commands", action="store_true", dest="help_commands",
     45                                help="provides help on the available commands")
     46         self.parser.add_option("-a", "--access-key", action="store", type="string",
     47                                help="your AWS Access Key")
     48         self.parser.add_option("-s", "--secret-key", action="store", type="string",
     49                                help="your AWS Secret Access Key")
     50         self.parser.add_option("-p", "--path", action="store", type="string", dest="path",
     51                                help="the path to local directory for submit and retrieve")
     52         self.parser.add_option("-k", "--keypair", action="store", type="string", dest="keypair",
     53                                help="the SSH keypair used with launched instance(s)")
     54         self.parser.add_option("-l", "--leave", action="store_true", dest="leave",
     55                                help="leave the files (don't retrieve) files during retrieve command")
     56         self.parser.set_defaults(leave=False)
     57         self.parser.add_option("-n", "--num-instances", action="store", type="string", dest="num_instances",
     58                                help="the number of launched instance(s)")
     59         self.parser.set_defaults(num_instances=1)
     60         self.parser.add_option("-i", "--ignore-dirs", action="append", type="string", dest="ignore",
     61                                help="directories that should be ignored by submit command")
     62         self.parser.add_option("-b", "--batch-id", action="store", type="string", dest="batch",
     63                                help="batch identifier required by the retrieve command")
     64 
     65     def print_command_help(self):
     66         print('\nCommands:')
     67         for key in self.Commands.keys():
     68             print('  %s\t\t%s' % (key, self.Commands[key]))
     69 
     70     def do_reset(self):
     71         iq = self.sd.get_obj('input_queue')
     72         if iq:
     73             print('clearing out input queue')
     74             i = 0
     75             m = iq.read()
     76             while m:
     77                 i += 1
     78                 iq.delete_message(m)
     79                 m = iq.read()
     80             print('deleted %d messages' % i)
     81         ob = self.sd.get_obj('output_bucket')
     82         ib = self.sd.get_obj('input_bucket')
     83         if ob:
     84             if ib and ob.name == ib.name:
     85                 return
     86             print('delete generated files in output bucket')
     87             i = 0
     88             for k in ob:
     89                 i += 1
     90                 k.delete()
     91             print('deleted %d keys' % i)
     92 
     93     def do_submit(self):
     94         if not self.options.path:
     95             self.parser.error('No path provided')
     96         if not os.path.exists(self.options.path):
     97             self.parser.error('Invalid path (%s)' % self.options.path)
     98         s = Submitter(self.sd)
     99         t = s.submit_path(self.options.path, None, self.options.ignore, None,
    100                           None, True, self.options.path)
    101         print('A total of %d files were submitted' % t[1])
    102         print('Batch Identifier: %s' % t[0])
    103 
    104     def do_start(self):
    105         ami_id = self.sd.get('ami_id')
    106         instance_type = self.sd.get('instance_type', 'm1.small')
    107         security_group = self.sd.get('security_group', 'default')
    108         if not ami_id:
    109             self.parser.error('ami_id option is required when starting the service')
    110         ec2 = boto.connect_ec2()
    111         if not self.sd.has_section('Credentials'):
    112             self.sd.add_section('Credentials')
    113             self.sd.set('Credentials', 'aws_access_key_id', ec2.aws_access_key_id)
    114             self.sd.set('Credentials', 'aws_secret_access_key', ec2.aws_secret_access_key)
    115         s = StringIO()
    116         self.sd.write(s)
    117         rs = ec2.get_all_images([ami_id])
    118         img = rs[0]
    119         r = img.run(user_data=s.getvalue(), key_name=self.options.keypair,
    120                     max_count=self.options.num_instances,
    121                     instance_type=instance_type,
    122                     security_groups=[security_group])
    123         print('Starting AMI: %s' % ami_id)
    124         print('Reservation %s contains the following instances:' % r.id)
    125         for i in r.instances:
    126             print('\t%s' % i.id)
    127 
    128     def do_status(self):
    129         iq = self.sd.get_obj('input_queue')
    130         if iq:
    131             print('The input_queue (%s) contains approximately %s messages' % (iq.id, iq.count()))
    132         ob = self.sd.get_obj('output_bucket')
    133         ib = self.sd.get_obj('input_bucket')
    134         if ob:
    135             if ib and ob.name == ib.name:
    136                 return
    137             total = 0
    138             for k in ob:
    139                 total += 1
    140             print('The output_bucket (%s) contains %d keys' % (ob.name, total))
    141 
    142     def do_retrieve(self):
    143         if not self.options.path:
    144             self.parser.error('No path provided')
    145         if not os.path.exists(self.options.path):
    146             self.parser.error('Invalid path (%s)' % self.options.path)
    147         if not self.options.batch:
    148             self.parser.error('batch identifier is required for retrieve command')
    149         s = ResultProcessor(self.options.batch, self.sd)
    150         s.get_results(self.options.path, get_file=(not self.options.leave))
    151 
    152     def do_batches(self):
    153         d = self.sd.get_obj('output_domain')
    154         if d:
    155             print('Available Batches:')
    156             rs = d.query("['type'='Batch']")
    157             for item in rs:
    158                 print('  %s' % item.name)
    159         else:
    160             self.parser.error('No output_domain specified for service')
    161             
    162     def main(self):
    163         self.options, self.args = self.parser.parse_args()
    164         if self.options.help_commands:
    165             self.print_command_help()
    166             sys.exit(0)
    167         if len(self.args) != 2:
    168             self.parser.error("config_file and command are required")
    169         self.config_file = self.args[0]
    170         self.sd = ServiceDef(self.config_file)
    171         self.command = self.args[1]
    172         if hasattr(self, 'do_%s' % self.command):
    173             method = getattr(self, 'do_%s' % self.command)
    174             method()
    175         else:
    176             self.parser.error('command (%s) not recognized' % self.command)
    177 
    178 if __name__ == "__main__":
    179     bs = BS()
    180     bs.main()
    181