Home | History | Annotate | Download | only in googleapiclient
      1 # Copyright 2014 Google Inc. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #      http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 
     15 """Channel notifications support.
     16 
     17 Classes and functions to support channel subscriptions and notifications
     18 on those channels.
     19 
     20 Notes:
     21   - This code is based on experimental APIs and is subject to change.
     22   - Notification does not do deduplication of notification ids, that's up to
     23     the receiver.
     24   - Storing the Channel between calls is up to the caller.
     25 
     26 
     27 Example setting up a channel:
     28 
     29   # Create a new channel that gets notifications via webhook.
     30   channel = new_webhook_channel("https://example.com/my_web_hook")
     31 
     32   # Store the channel, keyed by 'channel.id'. Store it before calling the
     33   # watch method because notifications may start arriving before the watch
     34   # method returns.
     35   ...
     36 
     37   resp = service.objects().watchAll(
     38     bucket="some_bucket_id", body=channel.body()).execute()
     39   channel.update(resp)
     40 
     41   # Store the channel, keyed by 'channel.id'. Store it after being updated
     42   # since the resource_id value will now be correct, and that's needed to
     43   # stop a subscription.
     44   ...
     45 
     46 
     47 An example Webhook implementation using webapp2. Note that webapp2 puts
     48 headers in a case insensitive dictionary, as headers aren't guaranteed to
     49 always be upper case.
     50 
     51   id = self.request.headers[X_GOOG_CHANNEL_ID]
     52 
     53   # Retrieve the channel by id.
     54   channel = ...
     55 
     56   # Parse notification from the headers, including validating the id.
     57   n = notification_from_headers(channel, self.request.headers)
     58 
     59   # Do app specific stuff with the notification here.
     60   if n.resource_state == 'sync':
     61     # Code to handle sync state.
     62   elif n.resource_state == 'exists':
     63     # Code to handle the exists state.
     64   elif n.resource_state == 'not_exists':
     65     # Code to handle the not exists state.
     66 
     67 
     68 Example of unsubscribing.
     69 
     70   service.channels().stop(channel.body()).execute()
     71 """
     72 from __future__ import absolute_import
     73 
     74 import datetime
     75 import uuid
     76 
     77 from googleapiclient import errors
     78 from googleapiclient import _helpers as util
     79 import six
     80 
     81 
     82 # The unix time epoch starts at midnight 1970.
     83 EPOCH = datetime.datetime.utcfromtimestamp(0)
     84 
     85 # Map the names of the parameters in the JSON channel description to
     86 # the parameter names we use in the Channel class.
     87 CHANNEL_PARAMS = {
     88     'address': 'address',
     89     'id': 'id',
     90     'expiration': 'expiration',
     91     'params': 'params',
     92     'resourceId': 'resource_id',
     93     'resourceUri': 'resource_uri',
     94     'type': 'type',
     95     'token': 'token',
     96     }
     97 
     98 X_GOOG_CHANNEL_ID     = 'X-GOOG-CHANNEL-ID'
     99 X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER'
    100 X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE'
    101 X_GOOG_RESOURCE_URI   = 'X-GOOG-RESOURCE-URI'
    102 X_GOOG_RESOURCE_ID    = 'X-GOOG-RESOURCE-ID'
    103 
    104 
    105 def _upper_header_keys(headers):
    106   new_headers = {}
    107   for k, v in six.iteritems(headers):
    108     new_headers[k.upper()] = v
    109   return new_headers
    110 
    111 
    112 class Notification(object):
    113   """A Notification from a Channel.
    114 
    115   Notifications are not usually constructed directly, but are returned
    116   from functions like notification_from_headers().
    117 
    118   Attributes:
    119     message_number: int, The unique id number of this notification.
    120     state: str, The state of the resource being monitored.
    121     uri: str, The address of the resource being monitored.
    122     resource_id: str, The unique identifier of the version of the resource at
    123       this event.
    124   """
    125   @util.positional(5)
    126   def __init__(self, message_number, state, resource_uri, resource_id):
    127     """Notification constructor.
    128 
    129     Args:
    130       message_number: int, The unique id number of this notification.
    131       state: str, The state of the resource being monitored. Can be one
    132         of "exists", "not_exists", or "sync".
    133       resource_uri: str, The address of the resource being monitored.
    134       resource_id: str, The identifier of the watched resource.
    135     """
    136     self.message_number = message_number
    137     self.state = state
    138     self.resource_uri = resource_uri
    139     self.resource_id = resource_id
    140 
    141 
    142 class Channel(object):
    143   """A Channel for notifications.
    144 
    145   Usually not constructed directly, instead it is returned from helper
    146   functions like new_webhook_channel().
    147 
    148   Attributes:
    149     type: str, The type of delivery mechanism used by this channel. For
    150       example, 'web_hook'.
    151     id: str, A UUID for the channel.
    152     token: str, An arbitrary string associated with the channel that
    153       is delivered to the target address with each event delivered
    154       over this channel.
    155     address: str, The address of the receiving entity where events are
    156       delivered. Specific to the channel type.
    157     expiration: int, The time, in milliseconds from the epoch, when this
    158       channel will expire.
    159     params: dict, A dictionary of string to string, with additional parameters
    160       controlling delivery channel behavior.
    161     resource_id: str, An opaque id that identifies the resource that is
    162       being watched. Stable across different API versions.
    163     resource_uri: str, The canonicalized ID of the watched resource.
    164   """
    165 
    166   @util.positional(5)
    167   def __init__(self, type, id, token, address, expiration=None,
    168                params=None, resource_id="", resource_uri=""):
    169     """Create a new Channel.
    170 
    171     In user code, this Channel constructor will not typically be called
    172     manually since there are functions for creating channels for each specific
    173     type with a more customized set of arguments to pass.
    174 
    175     Args:
    176       type: str, The type of delivery mechanism used by this channel. For
    177         example, 'web_hook'.
    178       id: str, A UUID for the channel.
    179       token: str, An arbitrary string associated with the channel that
    180         is delivered to the target address with each event delivered
    181         over this channel.
    182       address: str,  The address of the receiving entity where events are
    183         delivered. Specific to the channel type.
    184       expiration: int, The time, in milliseconds from the epoch, when this
    185         channel will expire.
    186       params: dict, A dictionary of string to string, with additional parameters
    187         controlling delivery channel behavior.
    188       resource_id: str, An opaque id that identifies the resource that is
    189         being watched. Stable across different API versions.
    190       resource_uri: str, The canonicalized ID of the watched resource.
    191     """
    192     self.type = type
    193     self.id = id
    194     self.token = token
    195     self.address = address
    196     self.expiration = expiration
    197     self.params = params
    198     self.resource_id = resource_id
    199     self.resource_uri = resource_uri
    200 
    201   def body(self):
    202     """Build a body from the Channel.
    203 
    204     Constructs a dictionary that's appropriate for passing into watch()
    205     methods as the value of body argument.
    206 
    207     Returns:
    208       A dictionary representation of the channel.
    209     """
    210     result = {
    211         'id': self.id,
    212         'token': self.token,
    213         'type': self.type,
    214         'address': self.address
    215         }
    216     if self.params:
    217       result['params'] = self.params
    218     if self.resource_id:
    219       result['resourceId'] = self.resource_id
    220     if self.resource_uri:
    221       result['resourceUri'] = self.resource_uri
    222     if self.expiration:
    223       result['expiration'] = self.expiration
    224 
    225     return result
    226 
    227   def update(self, resp):
    228     """Update a channel with information from the response of watch().
    229 
    230     When a request is sent to watch() a resource, the response returned
    231     from the watch() request is a dictionary with updated channel information,
    232     such as the resource_id, which is needed when stopping a subscription.
    233 
    234     Args:
    235       resp: dict, The response from a watch() method.
    236     """
    237     for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
    238       value = resp.get(json_name)
    239       if value is not None:
    240         setattr(self, param_name, value)
    241 
    242 
    243 def notification_from_headers(channel, headers):
    244   """Parse a notification from the webhook request headers, validate
    245     the notification, and return a Notification object.
    246 
    247   Args:
    248     channel: Channel, The channel that the notification is associated with.
    249     headers: dict, A dictionary like object that contains the request headers
    250       from the webhook HTTP request.
    251 
    252   Returns:
    253     A Notification object.
    254 
    255   Raises:
    256     errors.InvalidNotificationError if the notification is invalid.
    257     ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
    258   """
    259   headers = _upper_header_keys(headers)
    260   channel_id = headers[X_GOOG_CHANNEL_ID]
    261   if channel.id != channel_id:
    262     raise errors.InvalidNotificationError(
    263         'Channel id mismatch: %s != %s' % (channel.id, channel_id))
    264   else:
    265     message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
    266     state = headers[X_GOOG_RESOURCE_STATE]
    267     resource_uri = headers[X_GOOG_RESOURCE_URI]
    268     resource_id = headers[X_GOOG_RESOURCE_ID]
    269     return Notification(message_number, state, resource_uri, resource_id)
    270 
    271 
    272 @util.positional(2)
    273 def new_webhook_channel(url, token=None, expiration=None, params=None):
    274     """Create a new webhook Channel.
    275 
    276     Args:
    277       url: str, URL to post notifications to.
    278       token: str, An arbitrary string associated with the channel that
    279         is delivered to the target address with each notification delivered
    280         over this channel.
    281       expiration: datetime.datetime, A time in the future when the channel
    282         should expire. Can also be None if the subscription should use the
    283         default expiration. Note that different services may have different
    284         limits on how long a subscription lasts. Check the response from the
    285         watch() method to see the value the service has set for an expiration
    286         time.
    287       params: dict, Extra parameters to pass on channel creation. Currently
    288         not used for webhook channels.
    289     """
    290     expiration_ms = 0
    291     if expiration:
    292       delta = expiration - EPOCH
    293       expiration_ms = delta.microseconds/1000 + (
    294           delta.seconds + delta.days*24*3600)*1000
    295       if expiration_ms < 0:
    296         expiration_ms = 0
    297 
    298     return Channel('web_hook', str(uuid.uuid4()),
    299                    token, url, expiration=expiration_ms,
    300                    params=params)
    301 
    302