1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "components/copresence/rpc/rpc_handler.h" 6 7 #include <map> 8 9 #include "base/bind.h" 10 #include "base/command_line.h" 11 #include "base/guid.h" 12 #include "base/logging.h" 13 #include "base/strings/string_util.h" 14 15 // TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities 16 // to DeviceCapabilitiesW. This breaks the pb.h headers below. For now, 17 // we fix this with an #undef. 18 #include "base/time/time.h" 19 #if defined(OS_WIN) 20 #undef DeviceCapabilities 21 #endif 22 23 #include "components/copresence/copresence_switches.h" 24 #include "components/copresence/handlers/directive_handler.h" 25 #include "components/copresence/proto/codes.pb.h" 26 #include "components/copresence/proto/data.pb.h" 27 #include "components/copresence/proto/rpcs.pb.h" 28 #include "components/copresence/public/copresence_delegate.h" 29 #include "net/http/http_status_code.h" 30 31 // TODO(ckehoe): Return error messages for bad requests. 32 33 namespace copresence { 34 35 using google::protobuf::MessageLite; 36 using google::protobuf::RepeatedPtrField; 37 38 const char RpcHandler::kReportRequestRpcName[] = "report"; 39 40 namespace { 41 42 // UrlSafe is defined as: 43 // '/' represented by a '_' and '+' represented by a '-' 44 // TODO(rkc): Move this to the wrapper. 45 std::string ToUrlSafe(std::string token) { 46 base::ReplaceChars(token, "+", "-", &token); 47 base::ReplaceChars(token, "/", "_", &token); 48 return token; 49 } 50 51 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000; // 10 minutes. 52 const int kMaxInvalidTokens = 10000; 53 const char kRegisterDeviceRpcName[] = "registerdevice"; 54 const char kDefaultCopresenceServer[] = 55 "https://www.googleapis.com/copresence/v2/copresence"; 56 57 // Logging 58 59 // Checks for a copresence error. If there is one, logs it and returns true. 60 bool CopresenceErrorLogged(const Status& status) { 61 if (status.code() != OK) { 62 LOG(ERROR) << "Copresence error code " << status.code() 63 << (status.message().empty() ? std::string() : 64 ": " + status.message()); 65 } 66 return status.code() != OK; 67 } 68 69 void LogIfErrorStatus(const util::error::Code& code, 70 const std::string& context) { 71 LOG_IF(ERROR, code != util::error::OK) 72 << context << " error " << code << ". See " 73 << "cs/google3/util/task/codes.proto for more info."; 74 } 75 76 // If any errors occurred, logs them and returns true. 77 bool ReportErrorLogged(const ReportResponse& response) { 78 bool result = CopresenceErrorLogged(response.header().status()); 79 80 // The Report fails or succeeds as a unit. If any responses had errors, 81 // the header will too. Thus we don't need to propagate individual errors. 82 if (response.has_update_signals_response()) 83 LogIfErrorStatus(response.update_signals_response().status(), "Update"); 84 if (response.has_manage_messages_response()) 85 LogIfErrorStatus(response.manage_messages_response().status(), "Publish"); 86 if (response.has_manage_subscriptions_response()) { 87 LogIfErrorStatus(response.manage_subscriptions_response().status(), 88 "Subscribe"); 89 } 90 91 return result; 92 } 93 94 // Request construction 95 // TODO(ckehoe): Move these into a separate file? 96 97 template <typename T> 98 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) { 99 if (msg.has_token_exchange_strategy() && 100 msg.token_exchange_strategy().has_broadcast_scan_configuration()) { 101 return msg.token_exchange_strategy().broadcast_scan_configuration(); 102 } 103 return BROADCAST_SCAN_CONFIGURATION_UNKNOWN; 104 } 105 106 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) { 107 scoped_ptr<DeviceState> state(new DeviceState); 108 109 TokenTechnology* ultrasound = 110 state->mutable_capabilities()->add_token_technology(); 111 ultrasound->set_medium(AUDIO_ULTRASOUND_PASSBAND); 112 ultrasound->add_instruction_type(TRANSMIT); 113 ultrasound->add_instruction_type(RECEIVE); 114 115 TokenTechnology* audible = 116 state->mutable_capabilities()->add_token_technology(); 117 audible->set_medium(AUDIO_AUDIBLE_DTMF); 118 audible->add_instruction_type(TRANSMIT); 119 audible->add_instruction_type(RECEIVE); 120 121 return state.Pass(); 122 } 123 124 // TODO(ckehoe): We're keeping this code in a separate function for now 125 // because we get a version string from Chrome, but the proto expects 126 // an int64 version. We should probably change the version proto 127 // to handle a more detailed version. 128 ClientVersion* CreateVersion(const std::string& client, 129 const std::string& version_name) { 130 ClientVersion* version = new ClientVersion; 131 132 version->set_client(client); 133 version->set_version_name(version_name); 134 135 return version; 136 } 137 138 void AddTokenToRequest(ReportRequest* request, const AudioToken& token) { 139 TokenObservation* token_observation = 140 request->mutable_update_signals_request()->add_token_observation(); 141 token_observation->set_token_id(ToUrlSafe(token.token)); 142 143 TokenSignals* signals = token_observation->add_signals(); 144 signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF 145 : AUDIO_ULTRASOUND_PASSBAND); 146 signals->set_observed_time_millis(base::Time::Now().ToJsTime()); 147 } 148 149 } // namespace 150 151 // Public methods 152 153 RpcHandler::RpcHandler(CopresenceDelegate* delegate) 154 : delegate_(delegate), 155 invalid_audio_token_cache_( 156 base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs), 157 kMaxInvalidTokens), 158 server_post_callback_(base::Bind(&RpcHandler::SendHttpPost, 159 base::Unretained(this))) {} 160 161 RpcHandler::~RpcHandler() { 162 for (std::set<HttpPost*>::iterator post = pending_posts_.begin(); 163 post != pending_posts_.end(); ++post) { 164 delete *post; 165 } 166 167 if (delegate_ && delegate_->GetWhispernetClient()) { 168 delegate_->GetWhispernetClient()->RegisterTokensCallback( 169 WhispernetClient::TokensCallback()); 170 } 171 } 172 173 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) { 174 scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest); 175 DCHECK(device_id_.empty()); 176 177 request->mutable_push_service()->set_service(PUSH_SERVICE_NONE); 178 Identity* identity = 179 request->mutable_device_identifiers()->mutable_registrant(); 180 identity->set_type(CHROME); 181 identity->set_chrome_id(base::GenerateGUID()); 182 SendServerRequest( 183 kRegisterDeviceRpcName, 184 std::string(), 185 request.Pass(), 186 base::Bind(&RpcHandler::RegisterResponseHandler, 187 // On destruction, this request will be cancelled. 188 base::Unretained(this), 189 init_done_callback)); 190 } 191 192 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) { 193 SendReportRequest(request.Pass(), std::string(), StatusCallback()); 194 } 195 196 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request, 197 const std::string& app_id, 198 const StatusCallback& status_callback) { 199 DCHECK(request.get()); 200 DCHECK(!device_id_.empty()) 201 << "RpcHandler::Initialize() must complete successfully " 202 << "before other RpcHandler methods are called."; 203 204 DVLOG(3) << "Sending report request to server."; 205 206 // If we are unpublishing or unsubscribing, we need to stop those publish or 207 // subscribes right away, we don't need to wait for the server to tell us. 208 ProcessRemovedOperations(*request); 209 210 request->mutable_update_signals_request()->set_allocated_state( 211 GetDeviceCapabilities(*request).release()); 212 213 AddPlayingTokens(request.get()); 214 215 SendServerRequest(kReportRequestRpcName, 216 app_id, 217 request.Pass(), 218 // On destruction, this request will be cancelled. 219 base::Bind(&RpcHandler::ReportResponseHandler, 220 base::Unretained(this), 221 status_callback)); 222 } 223 224 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) { 225 DCHECK(!tokens.empty()); 226 227 scoped_ptr<ReportRequest> request(new ReportRequest); 228 for (size_t i = 0; i < tokens.size(); ++i) { 229 if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token))) 230 continue; 231 DVLOG(3) << "Sending token " << tokens[i].token << " to server."; 232 AddTokenToRequest(request.get(), tokens[i]); 233 } 234 SendReportRequest(request.Pass()); 235 } 236 237 void RpcHandler::ConnectToWhispernet() { 238 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); 239 240 // |directive_handler_| will be destructed with us, so unretained is safe. 241 directive_handler_.reset(new DirectiveHandler); 242 directive_handler_->Initialize( 243 base::Bind(&WhispernetClient::DecodeSamples, 244 base::Unretained(whispernet_client)), 245 base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector, 246 base::Unretained(this))); 247 248 whispernet_client->RegisterTokensCallback( 249 base::Bind(&RpcHandler::ReportTokens, 250 // On destruction, this callback will be disconnected. 251 base::Unretained(this))); 252 } 253 254 // Private methods 255 256 void RpcHandler::RegisterResponseHandler( 257 const SuccessCallback& init_done_callback, 258 HttpPost* completed_post, 259 int http_status_code, 260 const std::string& response_data) { 261 if (completed_post) { 262 int elements_erased = pending_posts_.erase(completed_post); 263 DCHECK(elements_erased); 264 delete completed_post; 265 } 266 267 if (http_status_code != net::HTTP_OK) { 268 init_done_callback.Run(false); 269 return; 270 } 271 272 RegisterDeviceResponse response; 273 if (!response.ParseFromString(response_data)) { 274 LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data; 275 init_done_callback.Run(false); 276 return; 277 } 278 279 if (CopresenceErrorLogged(response.header().status())) 280 return; 281 device_id_ = response.registered_device_id(); 282 DCHECK(!device_id_.empty()); 283 DVLOG(2) << "Device registration successful: id " << device_id_; 284 init_done_callback.Run(true); 285 } 286 287 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback, 288 HttpPost* completed_post, 289 int http_status_code, 290 const std::string& response_data) { 291 if (completed_post) { 292 int elements_erased = pending_posts_.erase(completed_post); 293 DCHECK(elements_erased); 294 delete completed_post; 295 } 296 297 if (http_status_code != net::HTTP_OK) { 298 if (!status_callback.is_null()) 299 status_callback.Run(FAIL); 300 return; 301 } 302 303 DVLOG(3) << "Received ReportResponse."; 304 ReportResponse response; 305 if (!response.ParseFromString(response_data)) { 306 LOG(ERROR) << "Invalid ReportResponse"; 307 if (!status_callback.is_null()) 308 status_callback.Run(FAIL); 309 return; 310 } 311 312 if (ReportErrorLogged(response)) { 313 if (!status_callback.is_null()) 314 status_callback.Run(FAIL); 315 return; 316 } 317 318 const RepeatedPtrField<MessageResult>& message_results = 319 response.manage_messages_response().published_message_result(); 320 for (int i = 0; i < message_results.size(); ++i) { 321 DVLOG(2) << "Published message with id " 322 << message_results.Get(i).published_message_id(); 323 } 324 325 const RepeatedPtrField<SubscriptionResult>& subscription_results = 326 response.manage_subscriptions_response().subscription_result(); 327 for (int i = 0; i < subscription_results.size(); ++i) { 328 DVLOG(2) << "Created subscription with id " 329 << subscription_results.Get(i).subscription_id(); 330 } 331 332 if (response.has_update_signals_response()) { 333 const UpdateSignalsResponse& update_response = 334 response.update_signals_response(); 335 DispatchMessages(update_response.message()); 336 337 if (directive_handler_.get()) { 338 for (int i = 0; i < update_response.directive_size(); ++i) 339 directive_handler_->AddDirective(update_response.directive(i)); 340 } else { 341 DVLOG(1) << "No directive handler."; 342 } 343 344 const RepeatedPtrField<Token>& tokens = update_response.token(); 345 for (int i = 0; i < tokens.size(); ++i) { 346 switch (tokens.Get(i).status()) { 347 case VALID: 348 // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a 349 // short TTL (like 10s) and send it up with every report request. 350 // Then we'll still get messages while we're waiting to hear it again. 351 VLOG(1) << "Got valid token " << tokens.Get(i).id(); 352 break; 353 case INVALID: 354 DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id(); 355 invalid_audio_token_cache_.Add(tokens.Get(i).id(), true); 356 break; 357 default: 358 DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code " 359 << tokens.Get(i).status(); 360 } 361 } 362 } 363 364 // TODO(ckehoe): Return a more detailed status response. 365 if (!status_callback.is_null()) 366 status_callback.Run(SUCCESS); 367 } 368 369 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) { 370 // Remove unpublishes. 371 if (request.has_manage_messages_request()) { 372 const RepeatedPtrField<std::string>& unpublishes = 373 request.manage_messages_request().id_to_unpublish(); 374 for (int i = 0; i < unpublishes.size(); ++i) 375 directive_handler_->RemoveDirectives(unpublishes.Get(i)); 376 } 377 378 // Remove unsubscribes. 379 if (request.has_manage_subscriptions_request()) { 380 const RepeatedPtrField<std::string>& unsubscribes = 381 request.manage_subscriptions_request().id_to_unsubscribe(); 382 for (int i = 0; i < unsubscribes.size(); ++i) 383 directive_handler_->RemoveDirectives(unsubscribes.Get(i)); 384 } 385 } 386 387 void RpcHandler::AddPlayingTokens(ReportRequest* request) { 388 if (!directive_handler_) 389 return; 390 391 const std::string& audible_token = directive_handler_->CurrentAudibleToken(); 392 const std::string& inaudible_token = 393 directive_handler_->CurrentInaudibleToken(); 394 395 if (!audible_token.empty()) 396 AddTokenToRequest(request, AudioToken(audible_token, true)); 397 if (!inaudible_token.empty()) 398 AddTokenToRequest(request, AudioToken(inaudible_token, false)); 399 } 400 401 void RpcHandler::DispatchMessages( 402 const RepeatedPtrField<SubscribedMessage>& messages) { 403 if (messages.size() == 0) 404 return; 405 406 // Index the messages by subscription id. 407 std::map<std::string, std::vector<Message>> messages_by_subscription; 408 DVLOG(3) << "Dispatching " << messages.size() << " messages"; 409 for (int m = 0; m < messages.size(); ++m) { 410 const RepeatedPtrField<std::string>& subscription_ids = 411 messages.Get(m).subscription_id(); 412 for (int s = 0; s < subscription_ids.size(); ++s) { 413 messages_by_subscription[subscription_ids.Get(s)].push_back( 414 messages.Get(m).published_message()); 415 } 416 } 417 418 // Send the messages for each subscription. 419 for (std::map<std::string, std::vector<Message>>::const_iterator 420 subscription = messages_by_subscription.begin(); 421 subscription != messages_by_subscription.end(); 422 ++subscription) { 423 // TODO(ckehoe): Once we have the app ID from the server, we need to pass 424 // it in here and get rid of the app id registry from the main API class. 425 delegate_->HandleMessages("", subscription->first, subscription->second); 426 } 427 } 428 429 RequestHeader* RpcHandler::CreateRequestHeader( 430 const std::string& client_name) const { 431 RequestHeader* header = new RequestHeader; 432 433 header->set_allocated_framework_version(CreateVersion( 434 "Chrome", delegate_->GetPlatformVersionString())); 435 if (!client_name.empty()) { 436 header->set_allocated_client_version( 437 CreateVersion(client_name, std::string())); 438 } 439 header->set_current_time_millis(base::Time::Now().ToJsTime()); 440 header->set_registered_device_id(device_id_); 441 442 DeviceFingerprint* fingerprint = new DeviceFingerprint; 443 fingerprint->set_platform_version(delegate_->GetPlatformVersionString()); 444 fingerprint->set_type(CHROME_PLATFORM_TYPE); 445 header->set_allocated_device_fingerprint(fingerprint); 446 447 return header; 448 } 449 450 template <class T> 451 void RpcHandler::SendServerRequest( 452 const std::string& rpc_name, 453 const std::string& app_id, 454 scoped_ptr<T> request, 455 const PostCleanupCallback& response_handler) { 456 request->set_allocated_header(CreateRequestHeader(app_id)); 457 server_post_callback_.Run(delegate_->GetRequestContext(), 458 rpc_name, 459 make_scoped_ptr<MessageLite>(request.release()), 460 response_handler); 461 } 462 463 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter, 464 const std::string& rpc_name, 465 scoped_ptr<MessageLite> request_proto, 466 const PostCleanupCallback& callback) { 467 // Create the base URL to call. 468 CommandLine* command_line = CommandLine::ForCurrentProcess(); 469 const std::string copresence_server_host = 470 command_line->HasSwitch(switches::kCopresenceServer) ? 471 command_line->GetSwitchValueASCII(switches::kCopresenceServer) : 472 kDefaultCopresenceServer; 473 474 // Create the request and keep a pointer until it completes. 475 HttpPost* http_post = new HttpPost( 476 url_context_getter, 477 copresence_server_host, 478 rpc_name, 479 command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken), 480 delegate_->GetAPIKey(), 481 *request_proto); 482 483 http_post->Start(base::Bind(callback, http_post)); 484 pending_posts_.insert(http_post); 485 } 486 487 void RpcHandler::AudioDirectiveListToWhispernetConnector( 488 const std::string& token, 489 bool audible, 490 const WhispernetClient::SamplesCallback& samples_callback) { 491 WhispernetClient* whispernet_client = delegate_->GetWhispernetClient(); 492 if (whispernet_client) { 493 whispernet_client->RegisterSamplesCallback(samples_callback); 494 whispernet_client->EncodeToken(token, audible); 495 } 496 } 497 498 } // namespace copresence 499