1 import base64 2 import datetime 3 import json 4 import sys 5 6 from protorpc import message_types 7 from protorpc import messages 8 from protorpc import util 9 import unittest2 10 11 from apitools.base.py import encoding 12 from apitools.base.py import exceptions 13 from apitools.base.py import extra_types 14 15 16 class SimpleMessage(messages.Message): 17 field = messages.StringField(1) 18 repfield = messages.StringField(2, repeated=True) 19 20 21 class BytesMessage(messages.Message): 22 field = messages.BytesField(1) 23 repfield = messages.BytesField(2, repeated=True) 24 25 26 class TimeMessage(messages.Message): 27 timefield = message_types.DateTimeField(3) 28 29 30 @encoding.MapUnrecognizedFields('additional_properties') 31 class AdditionalPropertiesMessage(messages.Message): 32 33 class AdditionalProperty(messages.Message): 34 key = messages.StringField(1) 35 value = messages.StringField(2) 36 37 additional_properties = messages.MessageField( 38 'AdditionalProperty', 1, repeated=True) 39 40 41 @encoding.MapUnrecognizedFields('additional_properties') 42 class UnrecognizedEnumMessage(messages.Message): 43 44 class ThisEnum(messages.Enum): 45 VALUE_ONE = 1 46 VALUE_TWO = 2 47 48 class AdditionalProperty(messages.Message): 49 key = messages.StringField(1) 50 value = messages.EnumField('UnrecognizedEnumMessage.ThisEnum', 2) 51 52 additional_properties = messages.MessageField( 53 AdditionalProperty, 1, repeated=True) 54 55 56 class CompoundPropertyType(messages.Message): 57 index = messages.IntegerField(1) 58 name = messages.StringField(2) 59 60 61 class MessageWithEnum(messages.Message): 62 63 class ThisEnum(messages.Enum): 64 VALUE_ONE = 1 65 VALUE_TWO = 2 66 67 field_one = messages.EnumField(ThisEnum, 1) 68 field_two = messages.EnumField(ThisEnum, 2, default=ThisEnum.VALUE_TWO) 69 ignored_field = messages.EnumField(ThisEnum, 3) 70 71 72 @encoding.MapUnrecognizedFields('additional_properties') 73 class AdditionalMessagePropertiesMessage(messages.Message): 74 75 class AdditionalProperty(messages.Message): 76 key = messages.StringField(1) 77 value = messages.MessageField(CompoundPropertyType, 2) 78 79 additional_properties = messages.MessageField( 80 'AdditionalProperty', 1, repeated=True) 81 82 83 class HasNestedMessage(messages.Message): 84 nested = messages.MessageField(AdditionalPropertiesMessage, 1) 85 nested_list = messages.StringField(2, repeated=True) 86 87 88 class ExtraNestedMessage(messages.Message): 89 nested = messages.MessageField(HasNestedMessage, 1) 90 91 92 class MessageWithRemappings(messages.Message): 93 94 class SomeEnum(messages.Enum): 95 enum_value = 1 96 second_value = 2 97 98 enum_field = messages.EnumField(SomeEnum, 1) 99 double_encoding = messages.EnumField(SomeEnum, 2) 100 another_field = messages.StringField(3) 101 repeated_enum = messages.EnumField(SomeEnum, 4, repeated=True) 102 repeated_field = messages.StringField(5, repeated=True) 103 104 105 @encoding.MapUnrecognizedFields('additional_properties') 106 class RepeatedJsonValueMessage(messages.Message): 107 108 class AdditionalProperty(messages.Message): 109 key = messages.StringField(1) 110 value = messages.MessageField(extra_types.JsonValue, 2, repeated=True) 111 112 additional_properties = messages.MessageField('AdditionalProperty', 1, 113 repeated=True) 114 115 116 encoding.AddCustomJsonEnumMapping(MessageWithRemappings.SomeEnum, 117 'enum_value', 'wire_name') 118 encoding.AddCustomJsonFieldMapping(MessageWithRemappings, 119 'double_encoding', 'doubleEncoding') 120 encoding.AddCustomJsonFieldMapping(MessageWithRemappings, 121 'another_field', 'anotherField') 122 encoding.AddCustomJsonFieldMapping(MessageWithRemappings, 123 'repeated_field', 'repeatedField') 124 125 126 class EncodingTest(unittest2.TestCase): 127 128 def testCopyProtoMessage(self): 129 msg = SimpleMessage(field='abc') 130 new_msg = encoding.CopyProtoMessage(msg) 131 self.assertEqual(msg.field, new_msg.field) 132 msg.field = 'def' 133 self.assertNotEqual(msg.field, new_msg.field) 134 135 def testBytesEncoding(self): 136 b64_str = 'AAc+' 137 b64_msg = '{"field": "%s"}' % b64_str 138 urlsafe_b64_str = 'AAc-' 139 urlsafe_b64_msg = '{"field": "%s"}' % urlsafe_b64_str 140 data = base64.b64decode(b64_str) 141 msg = BytesMessage(field=data) 142 self.assertEqual( 143 msg, encoding.JsonToMessage(BytesMessage, urlsafe_b64_msg)) 144 self.assertEqual(msg, encoding.JsonToMessage(BytesMessage, b64_msg)) 145 self.assertEqual(urlsafe_b64_msg, encoding.MessageToJson(msg)) 146 147 enc_rep_msg = '{"repfield": ["%(b)s", "%(b)s"]}' % { 148 'b': urlsafe_b64_str} 149 rep_msg = BytesMessage(repfield=[data, data]) 150 self.assertEqual( 151 rep_msg, encoding.JsonToMessage(BytesMessage, enc_rep_msg)) 152 self.assertEqual(enc_rep_msg, encoding.MessageToJson(rep_msg)) 153 154 def testIncludeFields(self): 155 msg = SimpleMessage() 156 self.assertEqual('{}', encoding.MessageToJson(msg)) 157 self.assertEqual( 158 '{"field": null}', 159 encoding.MessageToJson(msg, include_fields=['field'])) 160 self.assertEqual( 161 '{"repfield": []}', 162 encoding.MessageToJson(msg, include_fields=['repfield'])) 163 164 def testNestedIncludeFields(self): 165 msg = HasNestedMessage( 166 nested=AdditionalPropertiesMessage( 167 additional_properties=[])) 168 self.assertEqual( 169 '{"nested": null}', 170 encoding.MessageToJson(msg, include_fields=['nested'])) 171 self.assertEqual( 172 '{"nested": {"additional_properties": []}}', 173 encoding.MessageToJson( 174 msg, include_fields=['nested.additional_properties'])) 175 msg = ExtraNestedMessage(nested=msg) 176 self.assertEqual( 177 '{"nested": {"nested": null}}', 178 encoding.MessageToJson(msg, include_fields=['nested.nested'])) 179 self.assertEqual( 180 '{"nested": {"nested_list": []}}', 181 encoding.MessageToJson(msg, include_fields=['nested.nested_list'])) 182 self.assertEqual( 183 '{"nested": {"nested": {"additional_properties": []}}}', 184 encoding.MessageToJson( 185 msg, include_fields=['nested.nested.additional_properties'])) 186 187 def testAdditionalPropertyMapping(self): 188 msg = AdditionalPropertiesMessage() 189 msg.additional_properties = [ 190 AdditionalPropertiesMessage.AdditionalProperty( 191 key='key_one', value='value_one'), 192 AdditionalPropertiesMessage.AdditionalProperty( 193 key='key_two', value='value_two'), 194 ] 195 196 encoded_msg = encoding.MessageToJson(msg) 197 self.assertEqual( 198 {'key_one': 'value_one', 'key_two': 'value_two'}, 199 json.loads(encoded_msg)) 200 201 new_msg = encoding.JsonToMessage(type(msg), encoded_msg) 202 self.assertEqual( 203 set(('key_one', 'key_two')), 204 set([x.key for x in new_msg.additional_properties])) 205 self.assertIsNot(msg, new_msg) 206 207 new_msg.additional_properties.pop() 208 self.assertEqual(1, len(new_msg.additional_properties)) 209 self.assertEqual(2, len(msg.additional_properties)) 210 211 def testAdditionalMessageProperties(self): 212 json_msg = '{"input": {"index": 0, "name": "output"}}' 213 result = encoding.JsonToMessage( 214 AdditionalMessagePropertiesMessage, json_msg) 215 self.assertEqual(1, len(result.additional_properties)) 216 self.assertEqual(0, result.additional_properties[0].value.index) 217 218 def testUnrecognizedEnum(self): 219 json_msg = '{"input": "VALUE_ONE"}' 220 result = encoding.JsonToMessage( 221 UnrecognizedEnumMessage, json_msg) 222 self.assertEqual(1, len(result.additional_properties)) 223 self.assertEqual(UnrecognizedEnumMessage.ThisEnum.VALUE_ONE, 224 result.additional_properties[0].value) 225 226 def testNestedFieldMapping(self): 227 nested_msg = AdditionalPropertiesMessage() 228 nested_msg.additional_properties = [ 229 AdditionalPropertiesMessage.AdditionalProperty( 230 key='key_one', value='value_one'), 231 AdditionalPropertiesMessage.AdditionalProperty( 232 key='key_two', value='value_two'), 233 ] 234 msg = HasNestedMessage(nested=nested_msg) 235 236 encoded_msg = encoding.MessageToJson(msg) 237 self.assertEqual( 238 {'nested': {'key_one': 'value_one', 'key_two': 'value_two'}}, 239 json.loads(encoded_msg)) 240 241 new_msg = encoding.JsonToMessage(type(msg), encoded_msg) 242 self.assertEqual( 243 set(('key_one', 'key_two')), 244 set([x.key for x in new_msg.nested.additional_properties])) 245 246 new_msg.nested.additional_properties.pop() 247 self.assertEqual(1, len(new_msg.nested.additional_properties)) 248 self.assertEqual(2, len(msg.nested.additional_properties)) 249 250 def testValidEnums(self): 251 message_json = '{"field_one": "VALUE_ONE"}' 252 message = encoding.JsonToMessage(MessageWithEnum, message_json) 253 self.assertEqual(MessageWithEnum.ThisEnum.VALUE_ONE, message.field_one) 254 self.assertEqual(MessageWithEnum.ThisEnum.VALUE_TWO, message.field_two) 255 self.assertEqual(json.loads(message_json), 256 json.loads(encoding.MessageToJson(message))) 257 258 def testIgnoredEnums(self): 259 json_with_typo = '{"field_one": "VALUE_OEN"}' 260 message = encoding.JsonToMessage(MessageWithEnum, json_with_typo) 261 self.assertEqual(None, message.field_one) 262 self.assertEqual(('VALUE_OEN', messages.Variant.ENUM), 263 message.get_unrecognized_field_info('field_one')) 264 self.assertEqual(json.loads(json_with_typo), 265 json.loads(encoding.MessageToJson(message))) 266 267 empty_json = '' 268 message = encoding.JsonToMessage(MessageWithEnum, empty_json) 269 self.assertEqual(None, message.field_one) 270 271 def testIgnoredEnumsWithDefaults(self): 272 json_with_typo = '{"field_two": "VALUE_OEN"}' 273 message = encoding.JsonToMessage(MessageWithEnum, json_with_typo) 274 self.assertEqual(MessageWithEnum.ThisEnum.VALUE_TWO, message.field_two) 275 self.assertEqual(json.loads(json_with_typo), 276 json.loads(encoding.MessageToJson(message))) 277 278 def testUnknownNestedRoundtrip(self): 279 json_message = '{"field": "abc", "submessage": {"a": 1, "b": "foo"}}' 280 message = encoding.JsonToMessage(SimpleMessage, json_message) 281 self.assertEqual(json.loads(json_message), 282 json.loads(encoding.MessageToJson(message))) 283 284 def testJsonDatetime(self): 285 msg = TimeMessage(timefield=datetime.datetime( 286 2014, 7, 2, 23, 33, 25, 541000, 287 tzinfo=util.TimeZoneOffset(datetime.timedelta(0)))) 288 self.assertEqual( 289 '{"timefield": "2014-07-02T23:33:25.541000+00:00"}', 290 encoding.MessageToJson(msg)) 291 292 def testEnumRemapping(self): 293 msg = MessageWithRemappings( 294 enum_field=MessageWithRemappings.SomeEnum.enum_value) 295 json_message = encoding.MessageToJson(msg) 296 self.assertEqual('{"enum_field": "wire_name"}', json_message) 297 self.assertEqual( 298 msg, encoding.JsonToMessage(MessageWithRemappings, json_message)) 299 300 def testRepeatedEnumRemapping(self): 301 msg = MessageWithRemappings( 302 repeated_enum=[ 303 MessageWithRemappings.SomeEnum.enum_value, 304 MessageWithRemappings.SomeEnum.second_value, 305 ]) 306 json_message = encoding.MessageToJson(msg) 307 self.assertEqual('{"repeated_enum": ["wire_name", "second_value"]}', 308 json_message) 309 self.assertEqual( 310 msg, encoding.JsonToMessage(MessageWithRemappings, json_message)) 311 312 def testFieldRemapping(self): 313 msg = MessageWithRemappings(another_field='abc') 314 json_message = encoding.MessageToJson(msg) 315 self.assertEqual('{"anotherField": "abc"}', json_message) 316 self.assertEqual( 317 msg, encoding.JsonToMessage(MessageWithRemappings, json_message)) 318 319 def testRepeatedFieldRemapping(self): 320 msg = MessageWithRemappings(repeated_field=['abc', 'def']) 321 json_message = encoding.MessageToJson(msg) 322 self.assertEqual('{"repeatedField": ["abc", "def"]}', json_message) 323 self.assertEqual( 324 msg, encoding.JsonToMessage(MessageWithRemappings, json_message)) 325 326 def testMultipleRemapping(self): 327 msg = MessageWithRemappings( 328 double_encoding=MessageWithRemappings.SomeEnum.enum_value) 329 json_message = encoding.MessageToJson(msg) 330 self.assertEqual('{"doubleEncoding": "wire_name"}', json_message) 331 self.assertEqual( 332 msg, encoding.JsonToMessage(MessageWithRemappings, json_message)) 333 334 def testNoRepeatedRemapping(self): 335 self.assertRaises( 336 exceptions.InvalidDataError, 337 encoding.AddCustomJsonFieldMapping, 338 MessageWithRemappings, 'double_encoding', 'something_else') 339 self.assertRaises( 340 exceptions.InvalidDataError, 341 encoding.AddCustomJsonFieldMapping, 342 MessageWithRemappings, 'enum_field', 'anotherField') 343 self.assertRaises( 344 exceptions.InvalidDataError, 345 encoding.AddCustomJsonEnumMapping, 346 MessageWithRemappings.SomeEnum, 'enum_value', 'another_name') 347 self.assertRaises( 348 exceptions.InvalidDataError, 349 encoding.AddCustomJsonEnumMapping, 350 MessageWithRemappings.SomeEnum, 'second_value', 'wire_name') 351 352 def testMessageToRepr(self): 353 # Using the same string returned by MessageToRepr, with the 354 # module names fixed. 355 # pylint: disable=bad-whitespace 356 msg = SimpleMessage(field='field', repfield=['field', 'field', ],) 357 # pylint: enable=bad-whitespace 358 self.assertEqual( 359 encoding.MessageToRepr(msg), 360 r"%s.SimpleMessage(field='field',repfield=['field','field',],)" % ( 361 __name__,)) 362 self.assertEqual( 363 encoding.MessageToRepr(msg, no_modules=True), 364 r"SimpleMessage(field='field',repfield=['field','field',],)") 365 366 def testMessageToReprWithTime(self): 367 msg = TimeMessage(timefield=datetime.datetime( 368 2014, 7, 2, 23, 33, 25, 541000, 369 tzinfo=util.TimeZoneOffset(datetime.timedelta(0)))) 370 self.assertEqual( 371 encoding.MessageToRepr(msg, multiline=True), 372 ('%s.TimeMessage(\n ' 373 'timefield=datetime.datetime(2014, 7, 2, 23, 33, 25, 541000, ' 374 'tzinfo=protorpc.util.TimeZoneOffset(' 375 'datetime.timedelta(0))),\n)') % __name__) 376 self.assertEqual( 377 encoding.MessageToRepr(msg, multiline=True, no_modules=True), 378 'TimeMessage(\n ' 379 'timefield=datetime.datetime(2014, 7, 2, 23, 33, 25, 541000, ' 380 'tzinfo=TimeZoneOffset(datetime.timedelta(0))),\n)') 381 382 def testPackageMappingsNoPackage(self): 383 this_module_name = util.get_package_for_module(__name__) 384 full_type_name = 'MessageWithEnum.ThisEnum' 385 full_key = '%s.%s' % (this_module_name, full_type_name) 386 self.assertEqual(full_key, 387 encoding._GetTypeKey(MessageWithEnum.ThisEnum, '')) 388 389 def testPackageMappingsWithPackage(self): 390 this_module_name = util.get_package_for_module(__name__) 391 full_type_name = 'MessageWithEnum.ThisEnum' 392 full_key = '%s.%s' % (this_module_name, full_type_name) 393 this_module = sys.modules[__name__] 394 new_package = 'new_package' 395 try: 396 setattr(this_module, 'package', new_package) 397 new_key = '%s.%s' % (new_package, full_type_name) 398 self.assertEqual( 399 new_key, 400 encoding._GetTypeKey(MessageWithEnum.ThisEnum, '')) 401 self.assertEqual( 402 full_key, 403 encoding._GetTypeKey(MessageWithEnum.ThisEnum, new_package)) 404 finally: 405 delattr(this_module, 'package') 406 407 def testRepeatedJsonValuesAsRepeatedProperty(self): 408 encoded_msg = '{"a": [{"one": 1}]}' 409 msg = encoding.JsonToMessage(RepeatedJsonValueMessage, encoded_msg) 410 self.assertEqual(encoded_msg, encoding.MessageToJson(msg)) 411