1 # 2 # Copyright (C) 2016 The Android Open Source Project 3 # 4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # you may not use this file except in compliance with the License. 6 # You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 # 16 17 import logging 18 from google.cloud import pubsub 19 from google.cloud.exceptions import NotFound 20 from oauth2client.service_account import ServiceAccountCredentials 21 from time import sleep 22 23 from vts.harnesses.cloud_client import cloud_client_controller 24 25 26 class CloudClient(object): 27 """Communicates with App Engine to receive and run VTS tests. 28 29 Attributes: 30 clientName: string, the name of the runner machine. This must be pre- 31 enrolled with the PubSub service. 32 POLL_INTERVAL: int, the fequency at which pubsub service is polled (seconds) 33 MAX_MESSAGES: int, the maximum number of commands to receive at once 34 """ 35 36 POLL_INTERVAL = 5 37 MAX_MESSAGES = 100 38 39 def __init__(self, clientName, oauth2_service_json, path_cmdfile=None): 40 """Inits the object with the client name and a PubSub subscription 41 42 Args: 43 clientName: the name of the client. Must be pre-enrolled with the 44 PubSub service. 45 oauth2_service_json: path (string) to the service account JSON 46 keyfile. 47 """ 48 self.clientName = clientName 49 credentials = ServiceAccountCredentials.from_json_keyfile_name( 50 oauth2_service_json) 51 self._client = pubsub.Client(credentials=credentials) 52 self._topic = self._client.topic(clientName) 53 self._sub = self._topic.subscription(clientName) 54 self._controller = cloud_client_controller.CloudClientController( 55 path_cmdfile) 56 57 def Pull(self): 58 """Fetches new messages from the PubSub subscription. 59 60 Receives and acknowledges the commands published to the client's 61 subscription. 62 63 Returns: 64 list of commands (strings) from PubSub subscription. 65 """ 66 logging.info("Waiting for commands: %s", self.clientName) 67 results = self._sub.pull( 68 return_immediately=True, max_messages=self.MAX_MESSAGES) 69 70 if results: 71 logging.info("Commands received: %s", results) 72 self._sub.acknowledge([ack_id for ack_id, message in results]) 73 return [message.data for ack_id, message in results] 74 75 return None 76 77 def Run(self): 78 """Indefinitely pulls and invokes new commands from the PubSub service. 79 """ 80 try: 81 while True: 82 commands = self.Pull() 83 print(commands) 84 if not commands: 85 sleep(self.POLL_INTERVAL) 86 else: 87 self._controller.ExecuteTradeFedCommands(commands) 88 except NotFound as e: 89 logging.error("No subscription created for client %s", 90 self.clientName) 91