1 """Channel notifications support. 2 3 Classes and functions to support channel subscriptions and notifications 4 on those channels. 5 6 Notes: 7 - This code is based on experimental APIs and is subject to change. 8 - Notification does not do deduplication of notification ids, that's up to 9 the receiver. 10 - Storing the Channel between calls is up to the caller. 11 12 13 Example setting up a channel: 14 15 # Create a new channel that gets notifications via webhook. 16 channel = new_webhook_channel("https://example.com/my_web_hook") 17 18 # Store the channel, keyed by 'channel.id'. Store it before calling the 19 # watch method because notifications may start arriving before the watch 20 # method returns. 21 ... 22 23 resp = service.objects().watchAll( 24 bucket="some_bucket_id", body=channel.body()).execute() 25 channel.update(resp) 26 27 # Store the channel, keyed by 'channel.id'. Store it after being updated 28 # since the resource_id value will now be correct, and that's needed to 29 # stop a subscription. 30 ... 31 32 33 An example Webhook implementation using webapp2. Note that webapp2 puts 34 headers in a case insensitive dictionary, as headers aren't guaranteed to 35 always be upper case. 36 37 id = self.request.headers[X_GOOG_CHANNEL_ID] 38 39 # Retrieve the channel by id. 40 channel = ... 41 42 # Parse notification from the headers, including validating the id. 43 n = notification_from_headers(channel, self.request.headers) 44 45 # Do app specific stuff with the notification here. 46 if n.resource_state == 'sync': 47 # Code to handle sync state. 48 elif n.resource_state == 'exists': 49 # Code to handle the exists state. 50 elif n.resource_state == 'not_exists': 51 # Code to handle the not exists state. 52 53 54 Example of unsubscribing. 55 56 service.channels().stop(channel.body()) 57 """ 58 from __future__ import absolute_import 59 60 import datetime 61 import uuid 62 63 from googleapiclient import errors 64 from oauth2client import util 65 import six 66 67 68 # The unix time epoch starts at midnight 1970. 69 EPOCH = datetime.datetime.utcfromtimestamp(0) 70 71 # Map the names of the parameters in the JSON channel description to 72 # the parameter names we use in the Channel class. 73 CHANNEL_PARAMS = { 74 'address': 'address', 75 'id': 'id', 76 'expiration': 'expiration', 77 'params': 'params', 78 'resourceId': 'resource_id', 79 'resourceUri': 'resource_uri', 80 'type': 'type', 81 'token': 'token', 82 } 83 84 X_GOOG_CHANNEL_ID = 'X-GOOG-CHANNEL-ID' 85 X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER' 86 X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE' 87 X_GOOG_RESOURCE_URI = 'X-GOOG-RESOURCE-URI' 88 X_GOOG_RESOURCE_ID = 'X-GOOG-RESOURCE-ID' 89 90 91 def _upper_header_keys(headers): 92 new_headers = {} 93 for k, v in six.iteritems(headers): 94 new_headers[k.upper()] = v 95 return new_headers 96 97 98 class Notification(object): 99 """A Notification from a Channel. 100 101 Notifications are not usually constructed directly, but are returned 102 from functions like notification_from_headers(). 103 104 Attributes: 105 message_number: int, The unique id number of this notification. 106 state: str, The state of the resource being monitored. 107 uri: str, The address of the resource being monitored. 108 resource_id: str, The unique identifier of the version of the resource at 109 this event. 110 """ 111 @util.positional(5) 112 def __init__(self, message_number, state, resource_uri, resource_id): 113 """Notification constructor. 114 115 Args: 116 message_number: int, The unique id number of this notification. 117 state: str, The state of the resource being monitored. Can be one 118 of "exists", "not_exists", or "sync". 119 resource_uri: str, The address of the resource being monitored. 120 resource_id: str, The identifier of the watched resource. 121 """ 122 self.message_number = message_number 123 self.state = state 124 self.resource_uri = resource_uri 125 self.resource_id = resource_id 126 127 128 class Channel(object): 129 """A Channel for notifications. 130 131 Usually not constructed directly, instead it is returned from helper 132 functions like new_webhook_channel(). 133 134 Attributes: 135 type: str, The type of delivery mechanism used by this channel. For 136 example, 'web_hook'. 137 id: str, A UUID for the channel. 138 token: str, An arbitrary string associated with the channel that 139 is delivered to the target address with each event delivered 140 over this channel. 141 address: str, The address of the receiving entity where events are 142 delivered. Specific to the channel type. 143 expiration: int, The time, in milliseconds from the epoch, when this 144 channel will expire. 145 params: dict, A dictionary of string to string, with additional parameters 146 controlling delivery channel behavior. 147 resource_id: str, An opaque id that identifies the resource that is 148 being watched. Stable across different API versions. 149 resource_uri: str, The canonicalized ID of the watched resource. 150 """ 151 152 @util.positional(5) 153 def __init__(self, type, id, token, address, expiration=None, 154 params=None, resource_id="", resource_uri=""): 155 """Create a new Channel. 156 157 In user code, this Channel constructor will not typically be called 158 manually since there are functions for creating channels for each specific 159 type with a more customized set of arguments to pass. 160 161 Args: 162 type: str, The type of delivery mechanism used by this channel. For 163 example, 'web_hook'. 164 id: str, A UUID for the channel. 165 token: str, An arbitrary string associated with the channel that 166 is delivered to the target address with each event delivered 167 over this channel. 168 address: str, The address of the receiving entity where events are 169 delivered. Specific to the channel type. 170 expiration: int, The time, in milliseconds from the epoch, when this 171 channel will expire. 172 params: dict, A dictionary of string to string, with additional parameters 173 controlling delivery channel behavior. 174 resource_id: str, An opaque id that identifies the resource that is 175 being watched. Stable across different API versions. 176 resource_uri: str, The canonicalized ID of the watched resource. 177 """ 178 self.type = type 179 self.id = id 180 self.token = token 181 self.address = address 182 self.expiration = expiration 183 self.params = params 184 self.resource_id = resource_id 185 self.resource_uri = resource_uri 186 187 def body(self): 188 """Build a body from the Channel. 189 190 Constructs a dictionary that's appropriate for passing into watch() 191 methods as the value of body argument. 192 193 Returns: 194 A dictionary representation of the channel. 195 """ 196 result = { 197 'id': self.id, 198 'token': self.token, 199 'type': self.type, 200 'address': self.address 201 } 202 if self.params: 203 result['params'] = self.params 204 if self.resource_id: 205 result['resourceId'] = self.resource_id 206 if self.resource_uri: 207 result['resourceUri'] = self.resource_uri 208 if self.expiration: 209 result['expiration'] = self.expiration 210 211 return result 212 213 def update(self, resp): 214 """Update a channel with information from the response of watch(). 215 216 When a request is sent to watch() a resource, the response returned 217 from the watch() request is a dictionary with updated channel information, 218 such as the resource_id, which is needed when stopping a subscription. 219 220 Args: 221 resp: dict, The response from a watch() method. 222 """ 223 for json_name, param_name in six.iteritems(CHANNEL_PARAMS): 224 value = resp.get(json_name) 225 if value is not None: 226 setattr(self, param_name, value) 227 228 229 def notification_from_headers(channel, headers): 230 """Parse a notification from the webhook request headers, validate 231 the notification, and return a Notification object. 232 233 Args: 234 channel: Channel, The channel that the notification is associated with. 235 headers: dict, A dictionary like object that contains the request headers 236 from the webhook HTTP request. 237 238 Returns: 239 A Notification object. 240 241 Raises: 242 errors.InvalidNotificationError if the notification is invalid. 243 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int. 244 """ 245 headers = _upper_header_keys(headers) 246 channel_id = headers[X_GOOG_CHANNEL_ID] 247 if channel.id != channel_id: 248 raise errors.InvalidNotificationError( 249 'Channel id mismatch: %s != %s' % (channel.id, channel_id)) 250 else: 251 message_number = int(headers[X_GOOG_MESSAGE_NUMBER]) 252 state = headers[X_GOOG_RESOURCE_STATE] 253 resource_uri = headers[X_GOOG_RESOURCE_URI] 254 resource_id = headers[X_GOOG_RESOURCE_ID] 255 return Notification(message_number, state, resource_uri, resource_id) 256 257 258 @util.positional(2) 259 def new_webhook_channel(url, token=None, expiration=None, params=None): 260 """Create a new webhook Channel. 261 262 Args: 263 url: str, URL to post notifications to. 264 token: str, An arbitrary string associated with the channel that 265 is delivered to the target address with each notification delivered 266 over this channel. 267 expiration: datetime.datetime, A time in the future when the channel 268 should expire. Can also be None if the subscription should use the 269 default expiration. Note that different services may have different 270 limits on how long a subscription lasts. Check the response from the 271 watch() method to see the value the service has set for an expiration 272 time. 273 params: dict, Extra parameters to pass on channel creation. Currently 274 not used for webhook channels. 275 """ 276 expiration_ms = 0 277 if expiration: 278 delta = expiration - EPOCH 279 expiration_ms = delta.microseconds/1000 + ( 280 delta.seconds + delta.days*24*3600)*1000 281 if expiration_ms < 0: 282 expiration_ms = 0 283 284 return Channel('web_hook', str(uuid.uuid4()), 285 token, url, expiration=expiration_ms, 286 params=params) 287 288