1 #!/usr/bin/env python2 2 # Copyright 2016 The Chromium OS Authors. All rights reserved. 3 # Use of this source code is governed by a BSD-style license that can be 4 # found in the LICENSE file. 5 6 """Unit test for pubsub_utils.py""" 7 8 from __future__ import print_function 9 import os 10 import unittest 11 12 import mox 13 14 from apiclient import discovery 15 from oauth2client.client import ApplicationDefaultCredentialsError 16 from oauth2client.client import GoogleCredentials 17 from googleapiclient.errors import UnknownApiNameOrVersion 18 19 import pubsub_utils 20 21 _TEST_CLOUD_SERVICE_ACCOUNT_FILE = '/tmp/test-credential' 22 23 24 class MockedPubSub(object): 25 """A mocked PubSub handle.""" 26 def __init__(self, test, topic, msg, retry, ret_val=None, 27 raise_except=False): 28 self.test = test 29 self.topic = topic 30 self.msg = msg 31 self.retry = retry 32 self.ret_val = ret_val 33 self.raise_except = raise_except 34 35 def projects(self): 36 """Mocked PubSub projects.""" 37 return self 38 39 def topics(self): 40 """Mocked PubSub topics.""" 41 return self 42 43 def publish(self, topic, body): 44 """Mocked PubSub publish method. 45 46 @param topic: PubSub topic string. 47 @param body: PubSub notification body. 48 """ 49 self.test.assertEquals(self.topic, topic) 50 self.test.assertEquals(self.msg, body['messages'][0]) 51 return self 52 53 def execute(self, num_retries): 54 """Mocked PubSub execute method. 55 56 @param num_retries: Number of retries. 57 """ 58 self.test.assertEquals(self.retry, num_retries) 59 if self.raise_except: 60 raise Exception() 61 return self.ret_val 62 63 64 def _create_sample_message(): 65 """Creates a sample pubsub message.""" 66 msg_payload = {'data': 'sample data'} 67 msg_attributes = {} 68 msg_attributes['var'] = 'value' 69 msg_payload['attributes'] = msg_attributes 70 71 return msg_payload 72 73 74 class PubSubTests(mox.MoxTestBase): 75 """Tests for pubsub related functios.""" 76 77 def test_pubsub_with_no_service_account(self): 78 """Test getting the pubsub service""" 79 self.mox.StubOutWithMock(os.path, 'isfile') 80 self.mox.ReplayAll() 81 with self.assertRaises(pubsub_utils.PubSubException): 82 pubsub_utils.PubSubClient() 83 self.mox.VerifyAll() 84 85 def test_pubsub_with_non_existing_service_account(self): 86 """Test getting the pubsub service""" 87 self.mox.StubOutWithMock(os.path, 'isfile') 88 os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False) 89 self.mox.ReplayAll() 90 with self.assertRaises(pubsub_utils.PubSubException): 91 pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 92 self.mox.VerifyAll() 93 94 def test_pubsub_with_corrupted_service_account(self): 95 """Test pubsub with corrupted service account.""" 96 self.mox.StubOutWithMock(os.path, 'isfile') 97 self.mox.StubOutWithMock(GoogleCredentials, 'from_stream') 98 os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True) 99 GoogleCredentials.from_stream( 100 _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndRaise( 101 ApplicationDefaultCredentialsError()) 102 self.mox.ReplayAll() 103 with self.assertRaises(pubsub_utils.PubSubException): 104 pubsub_utils.PubSubClient(_TEST_CLOUD_SERVICE_ACCOUNT_FILE) 105 self.mox.VerifyAll() 106 107 def test_pubsub_with_invalid_service_account(self): 108 """Test pubsubwith invalid service account.""" 109 self.mox.StubOutWithMock(os.path, 'isfile') 110 self.mox.StubOutWithMock(GoogleCredentials, 'from_stream') 111 os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True) 112 credentials = self.mox.CreateMock(GoogleCredentials) 113 GoogleCredentials.from_stream( 114 _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials) 115 credentials.create_scoped_required().AndReturn(True) 116 credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn( 117 credentials) 118 self.mox.StubOutWithMock(discovery, 'build') 119 discovery.build( 120 pubsub_utils.PUBSUB_SERVICE_NAME, 121 pubsub_utils.PUBSUB_VERSION, 122 credentials=credentials).AndRaise(UnknownApiNameOrVersion()) 123 self.mox.ReplayAll() 124 with self.assertRaises(pubsub_utils.PubSubException): 125 msg = _create_sample_message() 126 pubsub_client = pubsub_utils.PubSubClient( 127 _TEST_CLOUD_SERVICE_ACCOUNT_FILE) 128 pubsub_client.publish_notifications('test_topic', [msg]) 129 self.mox.VerifyAll() 130 131 def test_publish_notifications(self): 132 """Test getting the pubsub service""" 133 self.mox.StubOutWithMock(os.path, 'isfile') 134 self.mox.StubOutWithMock(GoogleCredentials, 'from_stream') 135 os.path.isfile(_TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True) 136 credentials = self.mox.CreateMock(GoogleCredentials) 137 GoogleCredentials.from_stream( 138 _TEST_CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials) 139 credentials.create_scoped_required().AndReturn(True) 140 credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn( 141 credentials) 142 self.mox.StubOutWithMock(discovery, 'build') 143 msg = _create_sample_message() 144 discovery.build( 145 pubsub_utils.PUBSUB_SERVICE_NAME, 146 pubsub_utils.PUBSUB_VERSION, 147 credentials=credentials).AndReturn(MockedPubSub( 148 self, 149 'test_topic', 150 msg, 151 pubsub_utils.DEFAULT_PUBSUB_NUM_RETRIES, 152 # use tuple ('123') instead of list just for easy to 153 # write the test. 154 ret_val={'messageIds': ('123')})) 155 156 self.mox.ReplayAll() 157 pubsub_client = pubsub_utils.PubSubClient( 158 _TEST_CLOUD_SERVICE_ACCOUNT_FILE) 159 msg_ids = pubsub_client.publish_notifications('test_topic', [msg]) 160 self.assertEquals(('123'), msg_ids) 161 162 self.mox.VerifyAll() 163 164 165 if __name__ == '__main__': 166 unittest.main() 167