Home | History | Annotate | Download | only in sqs
      1 # Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/
      2 #
      3 # Permission is hereby granted, free of charge, to any person obtaining a
      4 # copy of this software and associated documentation files (the
      5 # "Software"), to deal in the Software without restriction, including
      6 # without limitation the rights to use, copy, modify, merge, publish, dis-
      7 # tribute, sublicense, and/or sell copies of the Software, and to permit
      8 # persons to whom the Software is furnished to do so, subject to the fol-
      9 # lowing conditions:
     10 #
     11 # The above copyright notice and this permission notice shall be included
     12 # in all copies or substantial portions of the Software.
     13 #
     14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
     15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
     16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
     17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
     18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     20 # IN THE SOFTWARE.
     21 
     22 """
     23 Represents an SQS Queue
     24 """
     25 from boto.compat import urllib
     26 from boto.sqs.message import Message
     27 
     28 
     29 class Queue(object):
     30 
     31     def __init__(self, connection=None, url=None, message_class=Message):
     32         self.connection = connection
     33         self.url = url
     34         self.message_class = message_class
     35         self.visibility_timeout = None
     36 
     37     def __repr__(self):
     38         return 'Queue(%s)' % self.url
     39 
     40     def _id(self):
     41         if self.url:
     42             val = urllib.parse.urlparse(self.url)[2]
     43         else:
     44             val = self.url
     45         return val
     46     id = property(_id)
     47 
     48     def _name(self):
     49         if self.url:
     50             val = urllib.parse.urlparse(self.url)[2].split('/')[2]
     51         else:
     52             val = self.url
     53         return  val
     54     name = property(_name)
     55 
     56     def _arn(self):
     57         parts = self.id.split('/')
     58         return 'arn:aws:sqs:%s:%s:%s' % (
     59             self.connection.region.name, parts[1], parts[2])
     60     arn = property(_arn)
     61 
     62     def startElement(self, name, attrs, connection):
     63         return None
     64 
     65     def endElement(self, name, value, connection):
     66         if name == 'QueueUrl':
     67             self.url = value
     68         elif name == 'VisibilityTimeout':
     69             self.visibility_timeout = int(value)
     70         else:
     71             setattr(self, name, value)
     72 
     73     def set_message_class(self, message_class):
     74         """
     75         Set the message class that should be used when instantiating
     76         messages read from the queue.  By default, the class
     77         :class:`boto.sqs.message.Message` is used but this can be overriden
     78         with any class that behaves like a message.
     79 
     80         :type message_class: Message-like class
     81         :param message_class:  The new Message class
     82         """
     83         self.message_class = message_class
     84 
     85     def get_attributes(self, attributes='All'):
     86         """
     87         Retrieves attributes about this queue object and returns
     88         them in an Attribute instance (subclass of a Dictionary).
     89 
     90         :type attributes: string
     91         :param attributes: String containing one of:
     92                            ApproximateNumberOfMessages,
     93                            ApproximateNumberOfMessagesNotVisible,
     94                            VisibilityTimeout,
     95                            CreatedTimestamp,
     96                            LastModifiedTimestamp,
     97                            Policy
     98                            ReceiveMessageWaitTimeSeconds
     99         :rtype: Attribute object
    100         :return: An Attribute object which is a mapping type holding the
    101                  requested name/value pairs
    102         """
    103         return self.connection.get_queue_attributes(self, attributes)
    104 
    105     def set_attribute(self, attribute, value):
    106         """
    107         Set a new value for an attribute of the Queue.
    108 
    109         :type attribute: String
    110         :param attribute: The name of the attribute you want to set.  The
    111                            only valid value at this time is: VisibilityTimeout
    112         :type value: int
    113         :param value: The new value for the attribute.
    114             For VisibilityTimeout the value must be an
    115             integer number of seconds from 0 to 86400.
    116 
    117         :rtype: bool
    118         :return: True if successful, otherwise False.
    119         """
    120         return self.connection.set_queue_attribute(self, attribute, value)
    121 
    122     def get_timeout(self):
    123         """
    124         Get the visibility timeout for the queue.
    125 
    126         :rtype: int
    127         :return: The number of seconds as an integer.
    128         """
    129         a = self.get_attributes('VisibilityTimeout')
    130         return int(a['VisibilityTimeout'])
    131 
    132     def set_timeout(self, visibility_timeout):
    133         """
    134         Set the visibility timeout for the queue.
    135 
    136         :type visibility_timeout: int
    137         :param visibility_timeout: The desired timeout in seconds
    138         """
    139         retval = self.set_attribute('VisibilityTimeout', visibility_timeout)
    140         if retval:
    141             self.visibility_timeout = visibility_timeout
    142         return retval
    143 
    144     def add_permission(self, label, aws_account_id, action_name):
    145         """
    146         Add a permission to a queue.
    147 
    148         :type label: str or unicode
    149         :param label: A unique identification of the permission you are setting.
    150             Maximum of 80 characters ``[0-9a-zA-Z_-]``
    151             Example, AliceSendMessage
    152 
    153         :type aws_account_id: str or unicode
    154         :param principal_id: The AWS account number of the principal who
    155             will be given permission.  The principal must have an AWS account,
    156             but does not need to be signed up for Amazon SQS. For information
    157             about locating the AWS account identification.
    158 
    159         :type action_name: str or unicode
    160         :param action_name: The action.  Valid choices are:
    161             SendMessage|ReceiveMessage|DeleteMessage|
    162             ChangeMessageVisibility|GetQueueAttributes|*
    163 
    164         :rtype: bool
    165         :return: True if successful, False otherwise.
    166 
    167         """
    168         return self.connection.add_permission(self, label, aws_account_id,
    169                                               action_name)
    170 
    171     def remove_permission(self, label):
    172         """
    173         Remove a permission from a queue.
    174 
    175         :type label: str or unicode
    176         :param label: The unique label associated with the permission
    177             being removed.
    178 
    179         :rtype: bool
    180         :return: True if successful, False otherwise.
    181         """
    182         return self.connection.remove_permission(self, label)
    183 
    184     def read(self, visibility_timeout=None, wait_time_seconds=None,
    185              message_attributes=None):
    186         """
    187         Read a single message from the queue.
    188 
    189         :type visibility_timeout: int
    190         :param visibility_timeout: The timeout for this message in seconds
    191 
    192         :type wait_time_seconds: int
    193         :param wait_time_seconds: The duration (in seconds) for which the call
    194             will wait for a message to arrive in the queue before returning.
    195             If a message is available, the call will return sooner than
    196             wait_time_seconds.
    197 
    198         :type message_attributes: list
    199         :param message_attributes: The name(s) of additional message
    200             attributes to return. The default is to return no additional
    201             message attributes. Use ``['All']`` or ``['.*']`` to return all.
    202 
    203         :rtype: :class:`boto.sqs.message.Message`
    204         :return: A single message or None if queue is empty
    205         """
    206         rs = self.get_messages(1, visibility_timeout,
    207                                wait_time_seconds=wait_time_seconds,
    208                                message_attributes=message_attributes)
    209         if len(rs) == 1:
    210             return rs[0]
    211         else:
    212             return None
    213 
    214     def write(self, message, delay_seconds=None):
    215         """
    216         Add a single message to the queue.
    217 
    218         :type message: Message
    219         :param message: The message to be written to the queue
    220 
    221         :rtype: :class:`boto.sqs.message.Message`
    222         :return: The :class:`boto.sqs.message.Message` object that was written.
    223         """
    224         new_msg = self.connection.send_message(self,
    225             message.get_body_encoded(), delay_seconds=delay_seconds,
    226             message_attributes=message.message_attributes)
    227         message.id = new_msg.id
    228         message.md5 = new_msg.md5
    229         return message
    230 
    231     def write_batch(self, messages):
    232         """
    233         Delivers up to 10 messages in a single request.
    234 
    235         :type messages: List of lists.
    236         :param messages: A list of lists or tuples.  Each inner
    237             tuple represents a single message to be written
    238             and consists of and ID (string) that must be unique
    239             within the list of messages, the message body itself
    240             which can be a maximum of 64K in length, an
    241             integer which represents the delay time (in seconds)
    242             for the message (0-900) before the message will
    243             be delivered to the queue, and an optional dict of
    244             message attributes like those passed to ``send_message``
    245             in the connection class.
    246         """
    247         return self.connection.send_message_batch(self, messages)
    248 
    249     def new_message(self, body='', **kwargs):
    250         """
    251         Create new message of appropriate class.
    252 
    253         :type body: message body
    254         :param body: The body of the newly created message (optional).
    255 
    256         :rtype: :class:`boto.sqs.message.Message`
    257         :return: A new Message object
    258         """
    259         m = self.message_class(self, body, **kwargs)
    260         m.queue = self
    261         return m
    262 
    263     # get a variable number of messages, returns a list of messages
    264     def get_messages(self, num_messages=1, visibility_timeout=None,
    265                      attributes=None, wait_time_seconds=None,
    266                      message_attributes=None):
    267         """
    268         Get a variable number of messages.
    269 
    270         :type num_messages: int
    271         :param num_messages: The maximum number of messages to read from
    272             the queue.
    273 
    274         :type visibility_timeout: int
    275         :param visibility_timeout: The VisibilityTimeout for the messages read.
    276 
    277         :type attributes: str
    278         :param attributes: The name of additional attribute to return
    279             with response or All if you want all attributes.  The
    280             default is to return no additional attributes.  Valid
    281             values: All SenderId SentTimestamp ApproximateReceiveCount
    282             ApproximateFirstReceiveTimestamp
    283 
    284         :type wait_time_seconds: int
    285         :param wait_time_seconds: The duration (in seconds) for which the call
    286             will wait for a message to arrive in the queue before returning.
    287             If a message is available, the call will return sooner than
    288             wait_time_seconds.
    289 
    290         :type message_attributes: list
    291         :param message_attributes: The name(s) of additional message
    292             attributes to return. The default is to return no additional
    293             message attributes. Use ``['All']`` or ``['.*']`` to return all.
    294 
    295         :rtype: list
    296         :return: A list of :class:`boto.sqs.message.Message` objects.
    297         """
    298         return self.connection.receive_message(
    299             self, number_messages=num_messages,
    300             visibility_timeout=visibility_timeout, attributes=attributes,
    301             wait_time_seconds=wait_time_seconds,
    302             message_attributes=message_attributes)
    303 
    304     def delete_message(self, message):
    305         """
    306         Delete a message from the queue.
    307 
    308         :type message: :class:`boto.sqs.message.Message`
    309         :param message: The :class:`boto.sqs.message.Message` object to delete.
    310 
    311         :rtype: bool
    312         :return: True if successful, False otherwise
    313         """
    314         return self.connection.delete_message(self, message)
    315 
    316     def delete_message_batch(self, messages):
    317         """
    318         Deletes a list of messages in a single request.
    319 
    320         :type messages: List of :class:`boto.sqs.message.Message` objects.
    321         :param messages: A list of message objects.
    322         """
    323         return self.connection.delete_message_batch(self, messages)
    324 
    325     def change_message_visibility_batch(self, messages):
    326         """
    327         A batch version of change_message_visibility that can act
    328         on up to 10 messages at a time.
    329 
    330         :type messages: List of tuples.
    331         :param messages: A list of tuples where each tuple consists
    332             of a :class:`boto.sqs.message.Message` object and an integer
    333             that represents the new visibility timeout for that message.
    334         """
    335         return self.connection.change_message_visibility_batch(self, messages)
    336 
    337     def delete(self):
    338         """
    339         Delete the queue.
    340         """
    341         return self.connection.delete_queue(self)
    342 
    343     def purge(self):
    344         """
    345         Purge all messages in the queue.
    346         """
    347         return self.connection.purge_queue(self)
    348 
    349     def clear(self, page_size=10, vtimeout=10):
    350         """Deprecated utility function to remove all messages from a queue"""
    351         return self.purge()
    352 
    353     def count(self, page_size=10, vtimeout=10):
    354         """
    355         Utility function to count the number of messages in a queue.
    356         Note: This function now calls GetQueueAttributes to obtain
    357         an 'approximate' count of the number of messages in a queue.
    358         """
    359         a = self.get_attributes('ApproximateNumberOfMessages')
    360         return int(a['ApproximateNumberOfMessages'])
    361 
    362     def count_slow(self, page_size=10, vtimeout=10):
    363         """
    364         Deprecated.  This is the old 'count' method that actually counts
    365         the messages by reading them all.  This gives an accurate count but
    366         is very slow for queues with non-trivial number of messasges.
    367         Instead, use get_attributes('ApproximateNumberOfMessages') to take
    368         advantage of the new SQS capability.  This is retained only for
    369         the unit tests.
    370         """
    371         n = 0
    372         l = self.get_messages(page_size, vtimeout)
    373         while l:
    374             for m in l:
    375                 n += 1
    376             l = self.get_messages(page_size, vtimeout)
    377         return n
    378 
    379     def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
    380         """Utility function to dump the messages in a queue to a file
    381         NOTE: Page size must be < 10 else SQS errors"""
    382         fp = open(file_name, 'wb')
    383         n = 0
    384         l = self.get_messages(page_size, vtimeout)
    385         while l:
    386             for m in l:
    387                 fp.write(m.get_body())
    388                 if sep:
    389                     fp.write(sep)
    390                 n += 1
    391             l = self.get_messages(page_size, vtimeout)
    392         fp.close()
    393         return n
    394 
    395     def save_to_file(self, fp, sep='\n'):
    396         """
    397         Read all messages from the queue and persist them to file-like object.
    398         Messages are written to the file and the 'sep' string is written
    399         in between messages.  Messages are deleted from the queue after
    400         being written to the file.
    401         Returns the number of messages saved.
    402         """
    403         n = 0
    404         m = self.read()
    405         while m:
    406             n += 1
    407             fp.write(m.get_body())
    408             if sep:
    409                 fp.write(sep)
    410             self.delete_message(m)
    411             m = self.read()
    412         return n
    413 
    414     def save_to_filename(self, file_name, sep='\n'):
    415         """
    416         Read all messages from the queue and persist them to local file.
    417         Messages are written to the file and the 'sep' string is written
    418         in between messages.  Messages are deleted from the queue after
    419         being written to the file.
    420         Returns the number of messages saved.
    421         """
    422         fp = open(file_name, 'wb')
    423         n = self.save_to_file(fp, sep)
    424         fp.close()
    425         return n
    426 
    427     # for backwards compatibility
    428     save = save_to_filename
    429 
    430     def save_to_s3(self, bucket):
    431         """
    432         Read all messages from the queue and persist them to S3.
    433         Messages are stored in the S3 bucket using a naming scheme of::
    434 
    435             <queue_id>/<message_id>
    436 
    437         Messages are deleted from the queue after being saved to S3.
    438         Returns the number of messages saved.
    439         """
    440         n = 0
    441         m = self.read()
    442         while m:
    443             n += 1
    444             key = bucket.new_key('%s/%s' % (self.id, m.id))
    445             key.set_contents_from_string(m.get_body())
    446             self.delete_message(m)
    447             m = self.read()
    448         return n
    449 
    450     def load_from_s3(self, bucket, prefix=None):
    451         """
    452         Load messages previously saved to S3.
    453         """
    454         n = 0
    455         if prefix:
    456             prefix = '%s/' % prefix
    457         else:
    458             prefix = '%s/' % self.id[1:]
    459         rs = bucket.list(prefix=prefix)
    460         for key in rs:
    461             n += 1
    462             m = self.new_message(key.get_contents_as_string())
    463             self.write(m)
    464         return n
    465 
    466     def load_from_file(self, fp, sep='\n'):
    467         """Utility function to load messages from a file-like object to a queue"""
    468         n = 0
    469         body = ''
    470         l = fp.readline()
    471         while l:
    472             if l == sep:
    473                 m = Message(self, body)
    474                 self.write(m)
    475                 n += 1
    476                 print('writing message %d' % n)
    477                 body = ''
    478             else:
    479                 body = body + l
    480             l = fp.readline()
    481         return n
    482 
    483     def load_from_filename(self, file_name, sep='\n'):
    484         """Utility function to load messages from a local filename to a queue"""
    485         fp = open(file_name, 'rb')
    486         n = self.load_from_file(fp, sep)
    487         fp.close()
    488         return n
    489 
    490     # for backward compatibility
    491     load = load_from_filename
    492 
    493