Home | History | Annotate | Download | only in googleapiclient
      1 """Channel notifications support.
      2 
      3 Classes and functions to support channel subscriptions and notifications
      4 on those channels.
      5 
      6 Notes:
      7   - This code is based on experimental APIs and is subject to change.
      8   - Notification does not do deduplication of notification ids, that's up to
      9     the receiver.
     10   - Storing the Channel between calls is up to the caller.
     11 
     12 
     13 Example setting up a channel:
     14 
     15   # Create a new channel that gets notifications via webhook.
     16   channel = new_webhook_channel("https://example.com/my_web_hook")
     17 
     18   # Store the channel, keyed by 'channel.id'. Store it before calling the
     19   # watch method because notifications may start arriving before the watch
     20   # method returns.
     21   ...
     22 
     23   resp = service.objects().watchAll(
     24     bucket="some_bucket_id", body=channel.body()).execute()
     25   channel.update(resp)
     26 
     27   # Store the channel, keyed by 'channel.id'. Store it after being updated
     28   # since the resource_id value will now be correct, and that's needed to
     29   # stop a subscription.
     30   ...
     31 
     32 
     33 An example Webhook implementation using webapp2. Note that webapp2 puts
     34 headers in a case insensitive dictionary, as headers aren't guaranteed to
     35 always be upper case.
     36 
     37   id = self.request.headers[X_GOOG_CHANNEL_ID]
     38 
     39   # Retrieve the channel by id.
     40   channel = ...
     41 
     42   # Parse notification from the headers, including validating the id.
     43   n = notification_from_headers(channel, self.request.headers)
     44 
     45   # Do app specific stuff with the notification here.
     46   if n.resource_state == 'sync':
     47     # Code to handle sync state.
     48   elif n.resource_state == 'exists':
     49     # Code to handle the exists state.
     50   elif n.resource_state == 'not_exists':
     51     # Code to handle the not exists state.
     52 
     53 
     54 Example of unsubscribing.
     55 
     56   service.channels().stop(channel.body())
     57 """
     58 from __future__ import absolute_import
     59 
     60 import datetime
     61 import uuid
     62 
     63 from googleapiclient import errors
     64 from oauth2client import util
     65 import six
     66 
     67 
     68 # The unix time epoch starts at midnight 1970.
     69 EPOCH = datetime.datetime.utcfromtimestamp(0)
     70 
     71 # Map the names of the parameters in the JSON channel description to
     72 # the parameter names we use in the Channel class.
     73 CHANNEL_PARAMS = {
     74     'address': 'address',
     75     'id': 'id',
     76     'expiration': 'expiration',
     77     'params': 'params',
     78     'resourceId': 'resource_id',
     79     'resourceUri': 'resource_uri',
     80     'type': 'type',
     81     'token': 'token',
     82     }
     83 
     84 X_GOOG_CHANNEL_ID     = 'X-GOOG-CHANNEL-ID'
     85 X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER'
     86 X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE'
     87 X_GOOG_RESOURCE_URI   = 'X-GOOG-RESOURCE-URI'
     88 X_GOOG_RESOURCE_ID    = 'X-GOOG-RESOURCE-ID'
     89 
     90 
     91 def _upper_header_keys(headers):
     92   new_headers = {}
     93   for k, v in six.iteritems(headers):
     94     new_headers[k.upper()] = v
     95   return new_headers
     96 
     97 
     98 class Notification(object):
     99   """A Notification from a Channel.
    100 
    101   Notifications are not usually constructed directly, but are returned
    102   from functions like notification_from_headers().
    103 
    104   Attributes:
    105     message_number: int, The unique id number of this notification.
    106     state: str, The state of the resource being monitored.
    107     uri: str, The address of the resource being monitored.
    108     resource_id: str, The unique identifier of the version of the resource at
    109       this event.
    110   """
    111   @util.positional(5)
    112   def __init__(self, message_number, state, resource_uri, resource_id):
    113     """Notification constructor.
    114 
    115     Args:
    116       message_number: int, The unique id number of this notification.
    117       state: str, The state of the resource being monitored. Can be one
    118         of "exists", "not_exists", or "sync".
    119       resource_uri: str, The address of the resource being monitored.
    120       resource_id: str, The identifier of the watched resource.
    121     """
    122     self.message_number = message_number
    123     self.state = state
    124     self.resource_uri = resource_uri
    125     self.resource_id = resource_id
    126 
    127 
    128 class Channel(object):
    129   """A Channel for notifications.
    130 
    131   Usually not constructed directly, instead it is returned from helper
    132   functions like new_webhook_channel().
    133 
    134   Attributes:
    135     type: str, The type of delivery mechanism used by this channel. For
    136       example, 'web_hook'.
    137     id: str, A UUID for the channel.
    138     token: str, An arbitrary string associated with the channel that
    139       is delivered to the target address with each event delivered
    140       over this channel.
    141     address: str, The address of the receiving entity where events are
    142       delivered. Specific to the channel type.
    143     expiration: int, The time, in milliseconds from the epoch, when this
    144       channel will expire.
    145     params: dict, A dictionary of string to string, with additional parameters
    146       controlling delivery channel behavior.
    147     resource_id: str, An opaque id that identifies the resource that is
    148       being watched. Stable across different API versions.
    149     resource_uri: str, The canonicalized ID of the watched resource.
    150   """
    151 
    152   @util.positional(5)
    153   def __init__(self, type, id, token, address, expiration=None,
    154                params=None, resource_id="", resource_uri=""):
    155     """Create a new Channel.
    156 
    157     In user code, this Channel constructor will not typically be called
    158     manually since there are functions for creating channels for each specific
    159     type with a more customized set of arguments to pass.
    160 
    161     Args:
    162       type: str, The type of delivery mechanism used by this channel. For
    163         example, 'web_hook'.
    164       id: str, A UUID for the channel.
    165       token: str, An arbitrary string associated with the channel that
    166         is delivered to the target address with each event delivered
    167         over this channel.
    168       address: str,  The address of the receiving entity where events are
    169         delivered. Specific to the channel type.
    170       expiration: int, The time, in milliseconds from the epoch, when this
    171         channel will expire.
    172       params: dict, A dictionary of string to string, with additional parameters
    173         controlling delivery channel behavior.
    174       resource_id: str, An opaque id that identifies the resource that is
    175         being watched. Stable across different API versions.
    176       resource_uri: str, The canonicalized ID of the watched resource.
    177     """
    178     self.type = type
    179     self.id = id
    180     self.token = token
    181     self.address = address
    182     self.expiration = expiration
    183     self.params = params
    184     self.resource_id = resource_id
    185     self.resource_uri = resource_uri
    186 
    187   def body(self):
    188     """Build a body from the Channel.
    189 
    190     Constructs a dictionary that's appropriate for passing into watch()
    191     methods as the value of body argument.
    192 
    193     Returns:
    194       A dictionary representation of the channel.
    195     """
    196     result = {
    197         'id': self.id,
    198         'token': self.token,
    199         'type': self.type,
    200         'address': self.address
    201         }
    202     if self.params:
    203       result['params'] = self.params
    204     if self.resource_id:
    205       result['resourceId'] = self.resource_id
    206     if self.resource_uri:
    207       result['resourceUri'] = self.resource_uri
    208     if self.expiration:
    209       result['expiration'] = self.expiration
    210 
    211     return result
    212 
    213   def update(self, resp):
    214     """Update a channel with information from the response of watch().
    215 
    216     When a request is sent to watch() a resource, the response returned
    217     from the watch() request is a dictionary with updated channel information,
    218     such as the resource_id, which is needed when stopping a subscription.
    219 
    220     Args:
    221       resp: dict, The response from a watch() method.
    222     """
    223     for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
    224       value = resp.get(json_name)
    225       if value is not None:
    226         setattr(self, param_name, value)
    227 
    228 
    229 def notification_from_headers(channel, headers):
    230   """Parse a notification from the webhook request headers, validate
    231     the notification, and return a Notification object.
    232 
    233   Args:
    234     channel: Channel, The channel that the notification is associated with.
    235     headers: dict, A dictionary like object that contains the request headers
    236       from the webhook HTTP request.
    237 
    238   Returns:
    239     A Notification object.
    240 
    241   Raises:
    242     errors.InvalidNotificationError if the notification is invalid.
    243     ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
    244   """
    245   headers = _upper_header_keys(headers)
    246   channel_id = headers[X_GOOG_CHANNEL_ID]
    247   if channel.id != channel_id:
    248     raise errors.InvalidNotificationError(
    249         'Channel id mismatch: %s != %s' % (channel.id, channel_id))
    250   else:
    251     message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
    252     state = headers[X_GOOG_RESOURCE_STATE]
    253     resource_uri = headers[X_GOOG_RESOURCE_URI]
    254     resource_id = headers[X_GOOG_RESOURCE_ID]
    255     return Notification(message_number, state, resource_uri, resource_id)
    256 
    257 
    258 @util.positional(2)
    259 def new_webhook_channel(url, token=None, expiration=None, params=None):
    260     """Create a new webhook Channel.
    261 
    262     Args:
    263       url: str, URL to post notifications to.
    264       token: str, An arbitrary string associated with the channel that
    265         is delivered to the target address with each notification delivered
    266         over this channel.
    267       expiration: datetime.datetime, A time in the future when the channel
    268         should expire. Can also be None if the subscription should use the
    269         default expiration. Note that different services may have different
    270         limits on how long a subscription lasts. Check the response from the
    271         watch() method to see the value the service has set for an expiration
    272         time.
    273       params: dict, Extra parameters to pass on channel creation. Currently
    274         not used for webhook channels.
    275     """
    276     expiration_ms = 0
    277     if expiration:
    278       delta = expiration - EPOCH
    279       expiration_ms = delta.microseconds/1000 + (
    280           delta.seconds + delta.days*24*3600)*1000
    281       if expiration_ms < 0:
    282         expiration_ms = 0
    283 
    284     return Channel('web_hook', str(uuid.uuid4()),
    285                    token, url, expiration=expiration_ms,
    286                    params=params)
    287 
    288