1 # Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/ 2 # 3 # Permission is hereby granted, free of charge, to any person obtaining a 4 # copy of this software and associated documentation files (the 5 # "Software"), to deal in the Software without restriction, including 6 # without limitation the rights to use, copy, modify, merge, publish, dis- 7 # tribute, sublicense, and/or sell copies of the Software, and to permit 8 # persons to whom the Software is furnished to do so, subject to the fol- 9 # lowing conditions: 10 # 11 # The above copyright notice and this permission notice shall be included 12 # in all copies or substantial portions of the Software. 13 # 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 20 # IN THE SOFTWARE. 21 22 """ 23 Represents an SQS Queue 24 """ 25 from boto.compat import urllib 26 from boto.sqs.message import Message 27 28 29 class Queue(object): 30 31 def __init__(self, connection=None, url=None, message_class=Message): 32 self.connection = connection 33 self.url = url 34 self.message_class = message_class 35 self.visibility_timeout = None 36 37 def __repr__(self): 38 return 'Queue(%s)' % self.url 39 40 def _id(self): 41 if self.url: 42 val = urllib.parse.urlparse(self.url)[2] 43 else: 44 val = self.url 45 return val 46 id = property(_id) 47 48 def _name(self): 49 if self.url: 50 val = urllib.parse.urlparse(self.url)[2].split('/')[2] 51 else: 52 val = self.url 53 return val 54 name = property(_name) 55 56 def _arn(self): 57 parts = self.id.split('/') 58 return 'arn:aws:sqs:%s:%s:%s' % ( 59 self.connection.region.name, parts[1], parts[2]) 60 arn = property(_arn) 61 62 def startElement(self, name, attrs, connection): 63 return None 64 65 def endElement(self, name, value, connection): 66 if name == 'QueueUrl': 67 self.url = value 68 elif name == 'VisibilityTimeout': 69 self.visibility_timeout = int(value) 70 else: 71 setattr(self, name, value) 72 73 def set_message_class(self, message_class): 74 """ 75 Set the message class that should be used when instantiating 76 messages read from the queue. By default, the class 77 :class:`boto.sqs.message.Message` is used but this can be overriden 78 with any class that behaves like a message. 79 80 :type message_class: Message-like class 81 :param message_class: The new Message class 82 """ 83 self.message_class = message_class 84 85 def get_attributes(self, attributes='All'): 86 """ 87 Retrieves attributes about this queue object and returns 88 them in an Attribute instance (subclass of a Dictionary). 89 90 :type attributes: string 91 :param attributes: String containing one of: 92 ApproximateNumberOfMessages, 93 ApproximateNumberOfMessagesNotVisible, 94 VisibilityTimeout, 95 CreatedTimestamp, 96 LastModifiedTimestamp, 97 Policy 98 ReceiveMessageWaitTimeSeconds 99 :rtype: Attribute object 100 :return: An Attribute object which is a mapping type holding the 101 requested name/value pairs 102 """ 103 return self.connection.get_queue_attributes(self, attributes) 104 105 def set_attribute(self, attribute, value): 106 """ 107 Set a new value for an attribute of the Queue. 108 109 :type attribute: String 110 :param attribute: The name of the attribute you want to set. The 111 only valid value at this time is: VisibilityTimeout 112 :type value: int 113 :param value: The new value for the attribute. 114 For VisibilityTimeout the value must be an 115 integer number of seconds from 0 to 86400. 116 117 :rtype: bool 118 :return: True if successful, otherwise False. 119 """ 120 return self.connection.set_queue_attribute(self, attribute, value) 121 122 def get_timeout(self): 123 """ 124 Get the visibility timeout for the queue. 125 126 :rtype: int 127 :return: The number of seconds as an integer. 128 """ 129 a = self.get_attributes('VisibilityTimeout') 130 return int(a['VisibilityTimeout']) 131 132 def set_timeout(self, visibility_timeout): 133 """ 134 Set the visibility timeout for the queue. 135 136 :type visibility_timeout: int 137 :param visibility_timeout: The desired timeout in seconds 138 """ 139 retval = self.set_attribute('VisibilityTimeout', visibility_timeout) 140 if retval: 141 self.visibility_timeout = visibility_timeout 142 return retval 143 144 def add_permission(self, label, aws_account_id, action_name): 145 """ 146 Add a permission to a queue. 147 148 :type label: str or unicode 149 :param label: A unique identification of the permission you are setting. 150 Maximum of 80 characters ``[0-9a-zA-Z_-]`` 151 Example, AliceSendMessage 152 153 :type aws_account_id: str or unicode 154 :param principal_id: The AWS account number of the principal who 155 will be given permission. The principal must have an AWS account, 156 but does not need to be signed up for Amazon SQS. For information 157 about locating the AWS account identification. 158 159 :type action_name: str or unicode 160 :param action_name: The action. Valid choices are: 161 SendMessage|ReceiveMessage|DeleteMessage| 162 ChangeMessageVisibility|GetQueueAttributes|* 163 164 :rtype: bool 165 :return: True if successful, False otherwise. 166 167 """ 168 return self.connection.add_permission(self, label, aws_account_id, 169 action_name) 170 171 def remove_permission(self, label): 172 """ 173 Remove a permission from a queue. 174 175 :type label: str or unicode 176 :param label: The unique label associated with the permission 177 being removed. 178 179 :rtype: bool 180 :return: True if successful, False otherwise. 181 """ 182 return self.connection.remove_permission(self, label) 183 184 def read(self, visibility_timeout=None, wait_time_seconds=None, 185 message_attributes=None): 186 """ 187 Read a single message from the queue. 188 189 :type visibility_timeout: int 190 :param visibility_timeout: The timeout for this message in seconds 191 192 :type wait_time_seconds: int 193 :param wait_time_seconds: The duration (in seconds) for which the call 194 will wait for a message to arrive in the queue before returning. 195 If a message is available, the call will return sooner than 196 wait_time_seconds. 197 198 :type message_attributes: list 199 :param message_attributes: The name(s) of additional message 200 attributes to return. The default is to return no additional 201 message attributes. Use ``['All']`` or ``['.*']`` to return all. 202 203 :rtype: :class:`boto.sqs.message.Message` 204 :return: A single message or None if queue is empty 205 """ 206 rs = self.get_messages(1, visibility_timeout, 207 wait_time_seconds=wait_time_seconds, 208 message_attributes=message_attributes) 209 if len(rs) == 1: 210 return rs[0] 211 else: 212 return None 213 214 def write(self, message, delay_seconds=None): 215 """ 216 Add a single message to the queue. 217 218 :type message: Message 219 :param message: The message to be written to the queue 220 221 :rtype: :class:`boto.sqs.message.Message` 222 :return: The :class:`boto.sqs.message.Message` object that was written. 223 """ 224 new_msg = self.connection.send_message(self, 225 message.get_body_encoded(), delay_seconds=delay_seconds, 226 message_attributes=message.message_attributes) 227 message.id = new_msg.id 228 message.md5 = new_msg.md5 229 return message 230 231 def write_batch(self, messages): 232 """ 233 Delivers up to 10 messages in a single request. 234 235 :type messages: List of lists. 236 :param messages: A list of lists or tuples. Each inner 237 tuple represents a single message to be written 238 and consists of and ID (string) that must be unique 239 within the list of messages, the message body itself 240 which can be a maximum of 64K in length, an 241 integer which represents the delay time (in seconds) 242 for the message (0-900) before the message will 243 be delivered to the queue, and an optional dict of 244 message attributes like those passed to ``send_message`` 245 in the connection class. 246 """ 247 return self.connection.send_message_batch(self, messages) 248 249 def new_message(self, body='', **kwargs): 250 """ 251 Create new message of appropriate class. 252 253 :type body: message body 254 :param body: The body of the newly created message (optional). 255 256 :rtype: :class:`boto.sqs.message.Message` 257 :return: A new Message object 258 """ 259 m = self.message_class(self, body, **kwargs) 260 m.queue = self 261 return m 262 263 # get a variable number of messages, returns a list of messages 264 def get_messages(self, num_messages=1, visibility_timeout=None, 265 attributes=None, wait_time_seconds=None, 266 message_attributes=None): 267 """ 268 Get a variable number of messages. 269 270 :type num_messages: int 271 :param num_messages: The maximum number of messages to read from 272 the queue. 273 274 :type visibility_timeout: int 275 :param visibility_timeout: The VisibilityTimeout for the messages read. 276 277 :type attributes: str 278 :param attributes: The name of additional attribute to return 279 with response or All if you want all attributes. The 280 default is to return no additional attributes. Valid 281 values: All SenderId SentTimestamp ApproximateReceiveCount 282 ApproximateFirstReceiveTimestamp 283 284 :type wait_time_seconds: int 285 :param wait_time_seconds: The duration (in seconds) for which the call 286 will wait for a message to arrive in the queue before returning. 287 If a message is available, the call will return sooner than 288 wait_time_seconds. 289 290 :type message_attributes: list 291 :param message_attributes: The name(s) of additional message 292 attributes to return. The default is to return no additional 293 message attributes. Use ``['All']`` or ``['.*']`` to return all. 294 295 :rtype: list 296 :return: A list of :class:`boto.sqs.message.Message` objects. 297 """ 298 return self.connection.receive_message( 299 self, number_messages=num_messages, 300 visibility_timeout=visibility_timeout, attributes=attributes, 301 wait_time_seconds=wait_time_seconds, 302 message_attributes=message_attributes) 303 304 def delete_message(self, message): 305 """ 306 Delete a message from the queue. 307 308 :type message: :class:`boto.sqs.message.Message` 309 :param message: The :class:`boto.sqs.message.Message` object to delete. 310 311 :rtype: bool 312 :return: True if successful, False otherwise 313 """ 314 return self.connection.delete_message(self, message) 315 316 def delete_message_batch(self, messages): 317 """ 318 Deletes a list of messages in a single request. 319 320 :type messages: List of :class:`boto.sqs.message.Message` objects. 321 :param messages: A list of message objects. 322 """ 323 return self.connection.delete_message_batch(self, messages) 324 325 def change_message_visibility_batch(self, messages): 326 """ 327 A batch version of change_message_visibility that can act 328 on up to 10 messages at a time. 329 330 :type messages: List of tuples. 331 :param messages: A list of tuples where each tuple consists 332 of a :class:`boto.sqs.message.Message` object and an integer 333 that represents the new visibility timeout for that message. 334 """ 335 return self.connection.change_message_visibility_batch(self, messages) 336 337 def delete(self): 338 """ 339 Delete the queue. 340 """ 341 return self.connection.delete_queue(self) 342 343 def purge(self): 344 """ 345 Purge all messages in the queue. 346 """ 347 return self.connection.purge_queue(self) 348 349 def clear(self, page_size=10, vtimeout=10): 350 """Deprecated utility function to remove all messages from a queue""" 351 return self.purge() 352 353 def count(self, page_size=10, vtimeout=10): 354 """ 355 Utility function to count the number of messages in a queue. 356 Note: This function now calls GetQueueAttributes to obtain 357 an 'approximate' count of the number of messages in a queue. 358 """ 359 a = self.get_attributes('ApproximateNumberOfMessages') 360 return int(a['ApproximateNumberOfMessages']) 361 362 def count_slow(self, page_size=10, vtimeout=10): 363 """ 364 Deprecated. This is the old 'count' method that actually counts 365 the messages by reading them all. This gives an accurate count but 366 is very slow for queues with non-trivial number of messasges. 367 Instead, use get_attributes('ApproximateNumberOfMessages') to take 368 advantage of the new SQS capability. This is retained only for 369 the unit tests. 370 """ 371 n = 0 372 l = self.get_messages(page_size, vtimeout) 373 while l: 374 for m in l: 375 n += 1 376 l = self.get_messages(page_size, vtimeout) 377 return n 378 379 def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'): 380 """Utility function to dump the messages in a queue to a file 381 NOTE: Page size must be < 10 else SQS errors""" 382 fp = open(file_name, 'wb') 383 n = 0 384 l = self.get_messages(page_size, vtimeout) 385 while l: 386 for m in l: 387 fp.write(m.get_body()) 388 if sep: 389 fp.write(sep) 390 n += 1 391 l = self.get_messages(page_size, vtimeout) 392 fp.close() 393 return n 394 395 def save_to_file(self, fp, sep='\n'): 396 """ 397 Read all messages from the queue and persist them to file-like object. 398 Messages are written to the file and the 'sep' string is written 399 in between messages. Messages are deleted from the queue after 400 being written to the file. 401 Returns the number of messages saved. 402 """ 403 n = 0 404 m = self.read() 405 while m: 406 n += 1 407 fp.write(m.get_body()) 408 if sep: 409 fp.write(sep) 410 self.delete_message(m) 411 m = self.read() 412 return n 413 414 def save_to_filename(self, file_name, sep='\n'): 415 """ 416 Read all messages from the queue and persist them to local file. 417 Messages are written to the file and the 'sep' string is written 418 in between messages. Messages are deleted from the queue after 419 being written to the file. 420 Returns the number of messages saved. 421 """ 422 fp = open(file_name, 'wb') 423 n = self.save_to_file(fp, sep) 424 fp.close() 425 return n 426 427 # for backwards compatibility 428 save = save_to_filename 429 430 def save_to_s3(self, bucket): 431 """ 432 Read all messages from the queue and persist them to S3. 433 Messages are stored in the S3 bucket using a naming scheme of:: 434 435 <queue_id>/<message_id> 436 437 Messages are deleted from the queue after being saved to S3. 438 Returns the number of messages saved. 439 """ 440 n = 0 441 m = self.read() 442 while m: 443 n += 1 444 key = bucket.new_key('%s/%s' % (self.id, m.id)) 445 key.set_contents_from_string(m.get_body()) 446 self.delete_message(m) 447 m = self.read() 448 return n 449 450 def load_from_s3(self, bucket, prefix=None): 451 """ 452 Load messages previously saved to S3. 453 """ 454 n = 0 455 if prefix: 456 prefix = '%s/' % prefix 457 else: 458 prefix = '%s/' % self.id[1:] 459 rs = bucket.list(prefix=prefix) 460 for key in rs: 461 n += 1 462 m = self.new_message(key.get_contents_as_string()) 463 self.write(m) 464 return n 465 466 def load_from_file(self, fp, sep='\n'): 467 """Utility function to load messages from a file-like object to a queue""" 468 n = 0 469 body = '' 470 l = fp.readline() 471 while l: 472 if l == sep: 473 m = Message(self, body) 474 self.write(m) 475 n += 1 476 print('writing message %d' % n) 477 body = '' 478 else: 479 body = body + l 480 l = fp.readline() 481 return n 482 483 def load_from_filename(self, file_name, sep='\n'): 484 """Utility function to load messages from a local filename to a queue""" 485 fp = open(file_name, 'rb') 486 n = self.load_from_file(fp, sep) 487 fp.close() 488 return n 489 490 # for backward compatibility 491 load = load_from_filename 492 493