1 #!/usr/bin/python2.4 2 # Copyright (c) 2010 The Chromium Authors. All rights reserved. 3 # Use of this source code is governed by a BSD-style license that can be 4 # found in the LICENSE file. 5 6 """An implementation of the server side of the Chromium sync protocol. 7 8 The details of the protocol are described mostly by comments in the protocol 9 buffer definition at chrome/browser/sync/protocol/sync.proto. 10 """ 11 12 import cgi 13 import copy 14 import operator 15 import random 16 import sys 17 import threading 18 19 import app_specifics_pb2 20 import autofill_specifics_pb2 21 import bookmark_specifics_pb2 22 import extension_specifics_pb2 23 import nigori_specifics_pb2 24 import password_specifics_pb2 25 import preference_specifics_pb2 26 import session_specifics_pb2 27 import sync_pb2 28 import theme_specifics_pb2 29 import typed_url_specifics_pb2 30 31 # An enumeration of the various kinds of data that can be synced. 32 # Over the wire, this enumeration is not used: a sync object's type is 33 # inferred by which EntitySpecifics extension it has. But in the context 34 # of a program, it is useful to have an enumeration. 35 ALL_TYPES = ( 36 TOP_LEVEL, # The type of the 'Google Chrome' folder. 37 APPS, 38 AUTOFILL, 39 AUTOFILL_PROFILE, 40 BOOKMARK, 41 EXTENSIONS, 42 NIGORI, 43 PASSWORD, 44 PREFERENCE, 45 SESSION, 46 THEME, 47 TYPED_URL) = range(12) 48 49 # Well-known server tag of the top level "Google Chrome" folder. 50 TOP_LEVEL_FOLDER_TAG = 'google_chrome' 51 52 # Given a sync type from ALL_TYPES, find the extension token corresponding 53 # to that datatype. Note that TOP_LEVEL has no such token. 54 SYNC_TYPE_TO_EXTENSION = { 55 APPS: app_specifics_pb2.app, 56 AUTOFILL: autofill_specifics_pb2.autofill, 57 AUTOFILL_PROFILE: autofill_specifics_pb2.autofill_profile, 58 BOOKMARK: bookmark_specifics_pb2.bookmark, 59 EXTENSIONS: extension_specifics_pb2.extension, 60 NIGORI: nigori_specifics_pb2.nigori, 61 PASSWORD: password_specifics_pb2.password, 62 PREFERENCE: preference_specifics_pb2.preference, 63 SESSION: session_specifics_pb2.session, 64 THEME: theme_specifics_pb2.theme, 65 TYPED_URL: typed_url_specifics_pb2.typed_url, 66 } 67 68 # The parent ID used to indicate a top-level node. 69 ROOT_ID = '0' 70 71 72 class Error(Exception): 73 """Error class for this module.""" 74 75 76 class ProtobufExtensionNotUnique(Error): 77 """An entry should not have more than one protobuf extension present.""" 78 79 80 class DataTypeIdNotRecognized(Error): 81 """The requested data type is not recognized.""" 82 83 84 def GetEntryType(entry): 85 """Extract the sync type from a SyncEntry. 86 87 Args: 88 entry: A SyncEntity protobuf object whose type to determine. 89 Returns: 90 A value from ALL_TYPES if the entry's type can be determined, or None 91 if the type cannot be determined. 92 Raises: 93 ProtobufExtensionNotUnique: More than one type was indicated by the entry. 94 """ 95 if entry.server_defined_unique_tag == TOP_LEVEL_FOLDER_TAG: 96 return TOP_LEVEL 97 entry_types = GetEntryTypesFromSpecifics(entry.specifics) 98 if not entry_types: 99 return None 100 101 # If there is more than one, either there's a bug, or else the caller 102 # should use GetEntryTypes. 103 if len(entry_types) > 1: 104 raise ProtobufExtensionNotUnique 105 return entry_types[0] 106 107 108 def GetEntryTypesFromSpecifics(specifics): 109 """Determine the sync types indicated by an EntitySpecifics's extension(s). 110 111 If the specifics have more than one recognized extension (as commonly 112 happens with the requested_types field of GetUpdatesMessage), all types 113 will be returned. Callers must handle the possibility of the returned 114 value having more than one item. 115 116 Args: 117 specifics: A EntitySpecifics protobuf message whose extensions to 118 enumerate. 119 Returns: 120 A list of the sync types (values from ALL_TYPES) assocated with each 121 recognized extension of the specifics message. 122 """ 123 return [data_type for data_type, extension 124 in SYNC_TYPE_TO_EXTENSION.iteritems() 125 if specifics.HasExtension(extension)] 126 127 128 def SyncTypeToProtocolDataTypeId(data_type): 129 """Convert from a sync type (python enum) to the protocol's data type id.""" 130 return SYNC_TYPE_TO_EXTENSION[data_type].number 131 132 133 def ProtocolDataTypeIdToSyncType(protocol_data_type_id): 134 """Convert from the protocol's data type id to a sync type (python enum).""" 135 for data_type, protocol_extension in SYNC_TYPE_TO_EXTENSION.iteritems(): 136 if protocol_extension.number == protocol_data_type_id: 137 return data_type 138 raise DataTypeIdNotRecognized 139 140 141 def GetDefaultEntitySpecifics(data_type): 142 """Get an EntitySpecifics having a sync type's default extension value.""" 143 specifics = sync_pb2.EntitySpecifics() 144 if data_type in SYNC_TYPE_TO_EXTENSION: 145 extension_handle = SYNC_TYPE_TO_EXTENSION[data_type] 146 specifics.Extensions[extension_handle].SetInParent() 147 return specifics 148 149 150 def DeepCopyOfProto(proto): 151 """Return a deep copy of a protocol buffer.""" 152 new_proto = type(proto)() 153 new_proto.MergeFrom(proto) 154 return new_proto 155 156 157 class PermanentItem(object): 158 """A specification of one server-created permanent item. 159 160 Attributes: 161 tag: A known-to-the-client value that uniquely identifies a server-created 162 permanent item. 163 name: The human-readable display name for this item. 164 parent_tag: The tag of the permanent item's parent. If ROOT_ID, indicates 165 a top-level item. Otherwise, this must be the tag value of some other 166 server-created permanent item. 167 sync_type: A value from ALL_TYPES, giving the datatype of this permanent 168 item. This controls which types of client GetUpdates requests will 169 cause the permanent item to be created and returned. 170 """ 171 172 def __init__(self, tag, name, parent_tag, sync_type): 173 self.tag = tag 174 self.name = name 175 self.parent_tag = parent_tag 176 self.sync_type = sync_type 177 178 179 class UpdateSieve(object): 180 """A filter to remove items the client has already seen.""" 181 def __init__(self, request): 182 self._original_request = request 183 self._state = {} 184 if request.from_progress_marker: 185 for marker in request.from_progress_marker: 186 if marker.HasField("timestamp_token_for_migration"): 187 timestamp = marker.timestamp_token_for_migration 188 elif marker.token: 189 timestamp = int(marker.token) 190 elif marker.HasField("token"): 191 timestamp = 0 192 else: 193 raise ValueError("No timestamp information in progress marker.") 194 data_type = ProtocolDataTypeIdToSyncType(marker.data_type_id) 195 self._state[data_type] = timestamp 196 elif request.HasField("from_timestamp"): 197 for data_type in GetEntryTypesFromSpecifics(request.requested_types): 198 self._state[data_type] = request.from_timestamp 199 if self._state: 200 self._state[TOP_LEVEL] = min(self._state.itervalues()) 201 202 def ClientWantsItem(self, item): 203 """Return true if the client hasn't already seen an item.""" 204 return self._state.get(GetEntryType(item), sys.maxint) < item.version 205 206 def HasAnyTimestamp(self): 207 """Return true if at least one datatype was requested.""" 208 return bool(self._state) 209 210 def GetMinTimestamp(self): 211 """Return true the smallest timestamp requested across all datatypes.""" 212 return min(self._state.itervalues()) 213 214 def GetFirstTimeTypes(self): 215 """Return a list of datatypes requesting updates from timestamp zero.""" 216 return [datatype for datatype, timestamp in self._state.iteritems() 217 if timestamp == 0] 218 219 def SaveProgress(self, new_timestamp, get_updates_response): 220 """Write the new_timestamp or new_progress_marker fields to a response.""" 221 if self._original_request.from_progress_marker: 222 for data_type, old_timestamp in self._state.iteritems(): 223 if data_type == TOP_LEVEL: 224 continue 225 new_marker = sync_pb2.DataTypeProgressMarker() 226 new_marker.data_type_id = SyncTypeToProtocolDataTypeId(data_type) 227 new_marker.token = str(max(old_timestamp, new_timestamp)) 228 if new_marker not in self._original_request.from_progress_marker: 229 get_updates_response.new_progress_marker.add().MergeFrom(new_marker) 230 elif self._original_request.HasField("from_timestamp"): 231 if self._original_request.from_timestamp < new_timestamp: 232 get_updates_response.new_timestamp = new_timestamp 233 234 235 class SyncDataModel(object): 236 """Models the account state of one sync user.""" 237 _BATCH_SIZE = 100 238 239 # Specify all the permanent items that a model might need. 240 _PERMANENT_ITEM_SPECS = [ 241 PermanentItem('google_chrome', name='Google Chrome', 242 parent_tag=ROOT_ID, sync_type=TOP_LEVEL), 243 PermanentItem('google_chrome_bookmarks', name='Bookmarks', 244 parent_tag='google_chrome', sync_type=BOOKMARK), 245 PermanentItem('bookmark_bar', name='Bookmark Bar', 246 parent_tag='google_chrome_bookmarks', sync_type=BOOKMARK), 247 PermanentItem('other_bookmarks', name='Other Bookmarks', 248 parent_tag='google_chrome_bookmarks', sync_type=BOOKMARK), 249 PermanentItem('google_chrome_preferences', name='Preferences', 250 parent_tag='google_chrome', sync_type=PREFERENCE), 251 PermanentItem('google_chrome_autofill', name='Autofill', 252 parent_tag='google_chrome', sync_type=AUTOFILL), 253 PermanentItem('google_chrome_autofill_profiles', name='Autofill Profiles', 254 parent_tag='google_chrome', sync_type=AUTOFILL_PROFILE), 255 PermanentItem('google_chrome_extensions', name='Extensions', 256 parent_tag='google_chrome', sync_type=EXTENSIONS), 257 PermanentItem('google_chrome_passwords', name='Passwords', 258 parent_tag='google_chrome', sync_type=PASSWORD), 259 PermanentItem('google_chrome_sessions', name='Sessions', 260 parent_tag='google_chrome', sync_type=SESSION), 261 PermanentItem('google_chrome_themes', name='Themes', 262 parent_tag='google_chrome', sync_type=THEME), 263 PermanentItem('google_chrome_typed_urls', name='Typed URLs', 264 parent_tag='google_chrome', sync_type=TYPED_URL), 265 PermanentItem('google_chrome_nigori', name='Nigori', 266 parent_tag='google_chrome', sync_type=NIGORI), 267 PermanentItem('google_chrome_apps', name='Apps', 268 parent_tag='google_chrome', sync_type=APPS), 269 ] 270 271 def __init__(self): 272 # Monotonically increasing version number. The next object change will 273 # take on this value + 1. 274 self._version = 0 275 276 # The definitive copy of this client's items: a map from ID string to a 277 # SyncEntity protocol buffer. 278 self._entries = {} 279 280 # TODO(nick): uuid.uuid1() is better, but python 2.5 only. 281 self.store_birthday = '%0.30f' % random.random() 282 283 def _SaveEntry(self, entry): 284 """Insert or update an entry in the change log, and give it a new version. 285 286 The ID fields of this entry are assumed to be valid server IDs. This 287 entry will be updated with a new version number and sync_timestamp. 288 289 Args: 290 entry: The entry to be added or updated. 291 """ 292 self._version += 1 293 # Maintain a global (rather than per-item) sequence number and use it 294 # both as the per-entry version as well as the update-progress timestamp. 295 # This simulates the behavior of the original server implementation. 296 entry.version = self._version 297 entry.sync_timestamp = self._version 298 299 # Preserve the originator info, which the client is not required to send 300 # when updating. 301 base_entry = self._entries.get(entry.id_string) 302 if base_entry: 303 entry.originator_cache_guid = base_entry.originator_cache_guid 304 entry.originator_client_item_id = base_entry.originator_client_item_id 305 306 self._entries[entry.id_string] = DeepCopyOfProto(entry) 307 308 def _ServerTagToId(self, tag): 309 """Determine the server ID from a server-unique tag. 310 311 The resulting value is guaranteed not to collide with the other ID 312 generation methods. 313 314 Args: 315 tag: The unique, known-to-the-client tag of a server-generated item. 316 Returns: 317 The string value of the computed server ID. 318 """ 319 if tag and tag != ROOT_ID: 320 return '<server tag>%s' % tag 321 else: 322 return tag 323 324 def _ClientTagToId(self, tag): 325 """Determine the server ID from a client-unique tag. 326 327 The resulting value is guaranteed not to collide with the other ID 328 generation methods. 329 330 Args: 331 tag: The unique, opaque-to-the-server tag of a client-tagged item. 332 Returns: 333 The string value of the computed server ID. 334 """ 335 return '<client tag>%s' % tag 336 337 def _ClientIdToId(self, client_guid, client_item_id): 338 """Compute a unique server ID from a client-local ID tag. 339 340 The resulting value is guaranteed not to collide with the other ID 341 generation methods. 342 343 Args: 344 client_guid: A globally unique ID that identifies the client which 345 created this item. 346 client_item_id: An ID that uniquely identifies this item on the client 347 which created it. 348 Returns: 349 The string value of the computed server ID. 350 """ 351 # Using the client ID info is not required here (we could instead generate 352 # a random ID), but it's useful for debugging. 353 return '<server ID originally>%s/%s' % (client_guid, client_item_id) 354 355 def _WritePosition(self, entry, parent_id, prev_id=None): 356 """Convert from a relative position into an absolute, numeric position. 357 358 Clients specify positions using the predecessor-based references; the 359 server stores and reports item positions using sparse integer values. 360 This method converts from the former to the latter. 361 362 Args: 363 entry: The entry for which to compute a position. Its ID field are 364 assumed to be server IDs. This entry will have its parent_id_string 365 and position_in_parent fields updated; its insert_after_item_id field 366 will be cleared. 367 parent_id: The ID of the entry intended as the new parent. 368 prev_id: The ID of the entry intended as the new predecessor. If this 369 is None, or an ID of an object which is not a child of the new parent, 370 the entry will be positioned at the end (right) of the ordering. If 371 the empty ID (''), this will be positioned at the front (left) of the 372 ordering. Otherwise, the entry will be given a position_in_parent 373 value placing it just after (to the right of) the new predecessor. 374 """ 375 preferred_gap = 2 ** 20 376 377 def ExtendRange(current_limit_entry, sign_multiplier): 378 """Compute values at the beginning or end.""" 379 if current_limit_entry.id_string == entry.id_string: 380 step = 0 381 else: 382 step = sign_multiplier * preferred_gap 383 return current_limit_entry.position_in_parent + step 384 385 siblings = [x for x in self._entries.values() 386 if x.parent_id_string == parent_id and not x.deleted] 387 siblings = sorted(siblings, key=operator.attrgetter('position_in_parent')) 388 if prev_id == entry.id_string: 389 prev_id = '' 390 if not siblings: 391 # First item in this container; start in the middle. 392 entry.position_in_parent = 0 393 elif not prev_id: 394 # A special value in the protocol. Insert at first position. 395 entry.position_in_parent = ExtendRange(siblings[0], -1) 396 else: 397 # Handle mid-insertion; consider items along with their successors. 398 for item, successor in zip(siblings, siblings[1:]): 399 if item.id_string != prev_id: 400 continue 401 elif successor.id_string == entry.id_string: 402 # We're already in place; don't change anything. 403 entry.position_in_parent = successor.position_in_parent 404 else: 405 # Interpolate new position between the previous item and its 406 # existing successor. 407 entry.position_in_parent = (item.position_in_parent * 7 + 408 successor.position_in_parent) / 8 409 break 410 else: 411 # Insert at end. Includes the case where prev_id is None. 412 entry.position_in_parent = ExtendRange(siblings[-1], +1) 413 414 entry.parent_id_string = parent_id 415 entry.ClearField('insert_after_item_id') 416 417 def _ItemExists(self, id_string): 418 """Determine whether an item exists in the changelog.""" 419 return id_string in self._entries 420 421 def _CreatePermanentItem(self, spec): 422 """Create one permanent item from its spec, if it doesn't exist. 423 424 The resulting item is added to the changelog. 425 426 Args: 427 spec: A PermanentItem object holding the properties of the item to create. 428 """ 429 id_string = self._ServerTagToId(spec.tag) 430 if self._ItemExists(id_string): 431 return 432 print 'Creating permanent item: %s' % spec.name 433 entry = sync_pb2.SyncEntity() 434 entry.id_string = id_string 435 entry.non_unique_name = spec.name 436 entry.name = spec.name 437 entry.server_defined_unique_tag = spec.tag 438 entry.folder = True 439 entry.deleted = False 440 entry.specifics.CopyFrom(GetDefaultEntitySpecifics(spec.sync_type)) 441 self._WritePosition(entry, self._ServerTagToId(spec.parent_tag)) 442 self._SaveEntry(entry) 443 444 def _CreatePermanentItems(self, requested_types): 445 """Ensure creation of all permanent items for a given set of sync types. 446 447 Args: 448 requested_types: A list of sync data types from ALL_TYPES. 449 Permanent items of only these types will be created. 450 """ 451 for spec in self._PERMANENT_ITEM_SPECS: 452 if spec.sync_type in requested_types: 453 self._CreatePermanentItem(spec) 454 455 def GetChanges(self, sieve): 456 """Get entries which have changed, oldest first. 457 458 The returned entries are limited to being _BATCH_SIZE many. The entries 459 are returned in strict version order. 460 461 Args: 462 sieve: An update sieve to use to filter out updates the client 463 has already seen. 464 Returns: 465 A tuple of (version, entries, changes_remaining). Version is a new 466 timestamp value, which should be used as the starting point for the 467 next query. Entries is the batch of entries meeting the current 468 timestamp query. Changes_remaining indicates the number of changes 469 left on the server after this batch. 470 """ 471 if not sieve.HasAnyTimestamp(): 472 return (0, [], 0) 473 min_timestamp = sieve.GetMinTimestamp() 474 self._CreatePermanentItems(sieve.GetFirstTimeTypes()) 475 change_log = sorted(self._entries.values(), 476 key=operator.attrgetter('version')) 477 new_changes = [x for x in change_log if x.version > min_timestamp] 478 # Pick batch_size new changes, and then filter them. This matches 479 # the RPC behavior of the production sync server. 480 batch = new_changes[:self._BATCH_SIZE] 481 if not batch: 482 # Client is up to date. 483 return (min_timestamp, [], 0) 484 485 # Restrict batch to requested types. Tombstones are untyped 486 # and will always get included. 487 filtered = [DeepCopyOfProto(item) for item in batch 488 if item.deleted or sieve.ClientWantsItem(item)] 489 490 # The new client timestamp is the timestamp of the last item in the 491 # batch, even if that item was filtered out. 492 return (batch[-1].version, filtered, len(new_changes) - len(batch)) 493 494 def _CopyOverImmutableFields(self, entry): 495 """Preserve immutable fields by copying pre-commit state. 496 497 Args: 498 entry: A sync entity from the client. 499 """ 500 if entry.id_string in self._entries: 501 if self._entries[entry.id_string].HasField( 502 'server_defined_unique_tag'): 503 entry.server_defined_unique_tag = ( 504 self._entries[entry.id_string].server_defined_unique_tag) 505 506 def _CheckVersionForCommit(self, entry): 507 """Perform an optimistic concurrency check on the version number. 508 509 Clients are only allowed to commit if they report having seen the most 510 recent version of an object. 511 512 Args: 513 entry: A sync entity from the client. It is assumed that ID fields 514 have been converted to server IDs. 515 Returns: 516 A boolean value indicating whether the client's version matches the 517 newest server version for the given entry. 518 """ 519 if entry.id_string in self._entries: 520 # Allow edits/deletes if the version matches, and any undeletion. 521 return (self._entries[entry.id_string].version == entry.version or 522 self._entries[entry.id_string].deleted) 523 else: 524 # Allow unknown ID only if the client thinks it's new too. 525 return entry.version == 0 526 527 def _CheckParentIdForCommit(self, entry): 528 """Check that the parent ID referenced in a SyncEntity actually exists. 529 530 Args: 531 entry: A sync entity from the client. It is assumed that ID fields 532 have been converted to server IDs. 533 Returns: 534 A boolean value indicating whether the entity's parent ID is an object 535 that actually exists (and is not deleted) in the current account state. 536 """ 537 if entry.parent_id_string == ROOT_ID: 538 # This is generally allowed. 539 return True 540 if entry.parent_id_string not in self._entries: 541 print 'Warning: Client sent unknown ID. Should never happen.' 542 return False 543 if entry.parent_id_string == entry.id_string: 544 print 'Warning: Client sent circular reference. Should never happen.' 545 return False 546 if self._entries[entry.parent_id_string].deleted: 547 # This can happen in a race condition between two clients. 548 return False 549 if not self._entries[entry.parent_id_string].folder: 550 print 'Warning: Client sent non-folder parent. Should never happen.' 551 return False 552 return True 553 554 def _RewriteIdsAsServerIds(self, entry, cache_guid, commit_session): 555 """Convert ID fields in a client sync entry to server IDs. 556 557 A commit batch sent by a client may contain new items for which the 558 server has not generated IDs yet. And within a commit batch, later 559 items are allowed to refer to earlier items. This method will 560 generate server IDs for new items, as well as rewrite references 561 to items whose server IDs were generated earlier in the batch. 562 563 Args: 564 entry: The client sync entry to modify. 565 cache_guid: The globally unique ID of the client that sent this 566 commit request. 567 commit_session: A dictionary mapping the original IDs to the new server 568 IDs, for any items committed earlier in the batch. 569 """ 570 if entry.version == 0: 571 if entry.HasField('client_defined_unique_tag'): 572 # When present, this should determine the item's ID. 573 new_id = self._ClientTagToId(entry.client_defined_unique_tag) 574 else: 575 new_id = self._ClientIdToId(cache_guid, entry.id_string) 576 entry.originator_cache_guid = cache_guid 577 entry.originator_client_item_id = entry.id_string 578 commit_session[entry.id_string] = new_id # Remember the remapping. 579 entry.id_string = new_id 580 if entry.parent_id_string in commit_session: 581 entry.parent_id_string = commit_session[entry.parent_id_string] 582 if entry.insert_after_item_id in commit_session: 583 entry.insert_after_item_id = commit_session[entry.insert_after_item_id] 584 585 def CommitEntry(self, entry, cache_guid, commit_session): 586 """Attempt to commit one entry to the user's account. 587 588 Args: 589 entry: A SyncEntity protobuf representing desired object changes. 590 cache_guid: A string value uniquely identifying the client; this 591 is used for ID generation and will determine the originator_cache_guid 592 if the entry is new. 593 commit_session: A dictionary mapping client IDs to server IDs for any 594 objects committed earlier this session. If the entry gets a new ID 595 during commit, the change will be recorded here. 596 Returns: 597 A SyncEntity reflecting the post-commit value of the entry, or None 598 if the entry was not committed due to an error. 599 """ 600 entry = DeepCopyOfProto(entry) 601 602 # Generate server IDs for this entry, and write generated server IDs 603 # from earlier entries into the message's fields, as appropriate. The 604 # ID generation state is stored in 'commit_session'. 605 self._RewriteIdsAsServerIds(entry, cache_guid, commit_session) 606 607 # Perform the optimistic concurrency check on the entry's version number. 608 # Clients are not allowed to commit unless they indicate that they've seen 609 # the most recent version of an object. 610 if not self._CheckVersionForCommit(entry): 611 return None 612 613 # Check the validity of the parent ID; it must exist at this point. 614 # TODO(nick): Implement cycle detection and resolution. 615 if not self._CheckParentIdForCommit(entry): 616 return None 617 618 self._CopyOverImmutableFields(entry); 619 620 # At this point, the commit is definitely going to happen. 621 622 # Deletion works by storing a limited record for an entry, called a 623 # tombstone. A sync server must track deleted IDs forever, since it does 624 # not keep track of client knowledge (there's no deletion ACK event). 625 if entry.deleted: 626 def MakeTombstone(id_string): 627 """Make a tombstone entry that will replace the entry being deleted. 628 629 Args: 630 id_string: Index of the SyncEntity to be deleted. 631 Returns: 632 A new SyncEntity reflecting the fact that the entry is deleted. 633 """ 634 # Only the ID, version and deletion state are preserved on a tombstone. 635 # TODO(nick): Does the production server not preserve the type? Not 636 # doing so means that tombstones cannot be filtered based on 637 # requested_types at GetUpdates time. 638 tombstone = sync_pb2.SyncEntity() 639 tombstone.id_string = id_string 640 tombstone.deleted = True 641 tombstone.name = '' 642 return tombstone 643 644 def IsChild(child_id): 645 """Check if a SyncEntity is a child of entry, or any of its children. 646 647 Args: 648 child_id: Index of the SyncEntity that is a possible child of entry. 649 Returns: 650 True if it is a child; false otherwise. 651 """ 652 if child_id not in self._entries: 653 return False 654 if self._entries[child_id].parent_id_string == entry.id_string: 655 return True 656 return IsChild(self._entries[child_id].parent_id_string) 657 658 # Identify any children entry might have. 659 child_ids = [child.id_string for child in self._entries.itervalues() 660 if IsChild(child.id_string)] 661 662 # Mark all children that were identified as deleted. 663 for child_id in child_ids: 664 self._SaveEntry(MakeTombstone(child_id)) 665 666 # Delete entry itself. 667 entry = MakeTombstone(entry.id_string) 668 else: 669 # Comments in sync.proto detail how the representation of positional 670 # ordering works: the 'insert_after_item_id' field specifies a 671 # predecessor during Commit operations, but the 'position_in_parent' 672 # field provides an absolute ordering in GetUpdates contexts. Here 673 # we convert from the former to the latter. Specifically, we'll 674 # generate a numeric position placing the item just after the object 675 # identified by 'insert_after_item_id', and then clear the 676 # 'insert_after_item_id' field so that it's not sent back to the client 677 # during later GetUpdates requests. 678 if entry.HasField('insert_after_item_id'): 679 self._WritePosition(entry, entry.parent_id_string, 680 entry.insert_after_item_id) 681 else: 682 self._WritePosition(entry, entry.parent_id_string) 683 684 # Preserve the originator info, which the client is not required to send 685 # when updating. 686 base_entry = self._entries.get(entry.id_string) 687 if base_entry and not entry.HasField('originator_cache_guid'): 688 entry.originator_cache_guid = base_entry.originator_cache_guid 689 entry.originator_client_item_id = base_entry.originator_client_item_id 690 691 # Commit the change. This also updates the version number. 692 self._SaveEntry(entry) 693 return entry 694 695 696 class TestServer(object): 697 """An object to handle requests for one (and only one) Chrome Sync account. 698 699 TestServer consumes the sync command messages that are the outermost 700 layers of the protocol, performs the corresponding actions on its 701 SyncDataModel, and constructs an appropropriate response message. 702 """ 703 704 def __init__(self): 705 # The implementation supports exactly one account; its state is here. 706 self.account = SyncDataModel() 707 self.account_lock = threading.Lock() 708 # Clients that have talked to us: a map from the full client ID 709 # to its nickname. 710 self.clients = {} 711 self.client_name_generator = ('+' * times + chr(c) 712 for times in xrange(0, sys.maxint) for c in xrange(ord('A'),ord('Z'))) 713 714 def GetShortClientName(self, query): 715 parsed = cgi.parse_qs(query[query.find('?')+1:]) 716 client_id = parsed.get('client_id') 717 if not client_id: 718 return '?' 719 client_id = client_id[0] 720 if client_id not in self.clients: 721 self.clients[client_id] = self.client_name_generator.next() 722 return self.clients[client_id] 723 724 def HandleCommand(self, query, raw_request): 725 """Decode and handle a sync command from a raw input of bytes. 726 727 This is the main entry point for this class. It is safe to call this 728 method from multiple threads. 729 730 Args: 731 raw_request: An iterable byte sequence to be interpreted as a sync 732 protocol command. 733 Returns: 734 A tuple (response_code, raw_response); the first value is an HTTP 735 result code, while the second value is a string of bytes which is the 736 serialized reply to the command. 737 """ 738 self.account_lock.acquire() 739 try: 740 request = sync_pb2.ClientToServerMessage() 741 request.MergeFromString(raw_request) 742 contents = request.message_contents 743 744 response = sync_pb2.ClientToServerResponse() 745 response.error_code = sync_pb2.ClientToServerResponse.SUCCESS 746 response.store_birthday = self.account.store_birthday 747 log_context = "[Client %s -> %s.py]" % (self.GetShortClientName(query), 748 __name__) 749 750 if contents == sync_pb2.ClientToServerMessage.AUTHENTICATE: 751 print '%s Authenticate' % log_context 752 # We accept any authentication token, and support only one account. 753 # TODO(nick): Mock out the GAIA authentication as well; hook up here. 754 response.authenticate.user.email = 'syncjuser@chromium' 755 response.authenticate.user.display_name = 'Sync J User' 756 elif contents == sync_pb2.ClientToServerMessage.COMMIT: 757 print '%s Commit' % log_context 758 self.HandleCommit(request.commit, response.commit) 759 elif contents == sync_pb2.ClientToServerMessage.GET_UPDATES: 760 print ('%s GetUpdates from timestamp %d' % 761 (log_context, request.get_updates.from_timestamp)) 762 self.HandleGetUpdates(request.get_updates, response.get_updates) 763 return (200, response.SerializeToString()) 764 finally: 765 self.account_lock.release() 766 767 def HandleCommit(self, commit_message, commit_response): 768 """Respond to a Commit request by updating the user's account state. 769 770 Commit attempts stop after the first error, returning a CONFLICT result 771 for any unattempted entries. 772 773 Args: 774 commit_message: A sync_pb.CommitMessage protobuf holding the content 775 of the client's request. 776 commit_response: A sync_pb.CommitResponse protobuf into which a reply 777 to the client request will be written. 778 """ 779 commit_response.SetInParent() 780 batch_failure = False 781 session = {} # Tracks ID renaming during the commit operation. 782 guid = commit_message.cache_guid 783 for entry in commit_message.entries: 784 server_entry = None 785 if not batch_failure: 786 # Try to commit the change to the account. 787 server_entry = self.account.CommitEntry(entry, guid, session) 788 789 # An entryresponse is returned in both success and failure cases. 790 reply = commit_response.entryresponse.add() 791 if not server_entry: 792 reply.response_type = sync_pb2.CommitResponse.CONFLICT 793 reply.error_message = 'Conflict.' 794 batch_failure = True # One failure halts the batch. 795 else: 796 reply.response_type = sync_pb2.CommitResponse.SUCCESS 797 # These are the properties that the server is allowed to override 798 # during commit; the client wants to know their values at the end 799 # of the operation. 800 reply.id_string = server_entry.id_string 801 if not server_entry.deleted: 802 # Note: the production server doesn't actually send the 803 # parent_id_string on commit responses, so we don't either. 804 reply.position_in_parent = server_entry.position_in_parent 805 reply.version = server_entry.version 806 reply.name = server_entry.name 807 reply.non_unique_name = server_entry.non_unique_name 808 else: 809 reply.version = entry.version + 1 810 811 def HandleGetUpdates(self, update_request, update_response): 812 """Respond to a GetUpdates request by querying the user's account. 813 814 Args: 815 update_request: A sync_pb.GetUpdatesMessage protobuf holding the content 816 of the client's request. 817 update_response: A sync_pb.GetUpdatesResponse protobuf into which a reply 818 to the client request will be written. 819 """ 820 update_response.SetInParent() 821 update_sieve = UpdateSieve(update_request) 822 new_timestamp, entries, remaining = self.account.GetChanges(update_sieve) 823 824 update_response.changes_remaining = remaining 825 for entry in entries: 826 reply = update_response.entries.add() 827 reply.CopyFrom(entry) 828 update_sieve.SaveProgress(new_timestamp, update_response) 829