Home | History | Annotate | Download | only in rpc
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "components/copresence/rpc/rpc_handler.h"
      6 
      7 #include <map>
      8 
      9 #include "base/bind.h"
     10 #include "base/command_line.h"
     11 #include "base/guid.h"
     12 #include "base/logging.h"
     13 #include "base/strings/string_util.h"
     14 
     15 // TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
     16 // to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
     17 // we fix this with an #undef.
     18 #include "base/time/time.h"
     19 #if defined(OS_WIN)
     20 #undef DeviceCapabilities
     21 #endif
     22 
     23 #include "components/copresence/copresence_switches.h"
     24 #include "components/copresence/handlers/directive_handler.h"
     25 #include "components/copresence/proto/codes.pb.h"
     26 #include "components/copresence/proto/data.pb.h"
     27 #include "components/copresence/proto/rpcs.pb.h"
     28 #include "components/copresence/public/copresence_delegate.h"
     29 #include "net/http/http_status_code.h"
     30 
     31 // TODO(ckehoe): Return error messages for bad requests.
     32 
     33 namespace copresence {
     34 
     35 using google::protobuf::MessageLite;
     36 using google::protobuf::RepeatedPtrField;
     37 
     38 const char RpcHandler::kReportRequestRpcName[] = "report";
     39 
     40 namespace {
     41 
     42 // UrlSafe is defined as:
     43 // '/' represented by a '_' and '+' represented by a '-'
     44 // TODO(rkc): Move this to the wrapper.
     45 std::string ToUrlSafe(std::string token) {
     46   base::ReplaceChars(token, "+", "-", &token);
     47   base::ReplaceChars(token, "/", "_", &token);
     48   return token;
     49 }
     50 
     51 const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000;  // 10 minutes.
     52 const int kMaxInvalidTokens = 10000;
     53 const char kRegisterDeviceRpcName[] = "registerdevice";
     54 const char kDefaultCopresenceServer[] =
     55     "https://www.googleapis.com/copresence/v2/copresence";
     56 
     57 // Logging
     58 
     59 // Checks for a copresence error. If there is one, logs it and returns true.
     60 bool CopresenceErrorLogged(const Status& status) {
     61   if (status.code() != OK) {
     62     LOG(ERROR) << "Copresence error code " << status.code()
     63                << (status.message().empty() ? std::string() :
     64                   ": " + status.message());
     65   }
     66   return status.code() != OK;
     67 }
     68 
     69 void LogIfErrorStatus(const util::error::Code& code,
     70                       const std::string& context) {
     71   LOG_IF(ERROR, code != util::error::OK)
     72       << context << " error " << code << ". See "
     73       << "cs/google3/util/task/codes.proto for more info.";
     74 }
     75 
     76 // If any errors occurred, logs them and returns true.
     77 bool ReportErrorLogged(const ReportResponse& response) {
     78   bool result = CopresenceErrorLogged(response.header().status());
     79 
     80   // The Report fails or succeeds as a unit. If any responses had errors,
     81   // the header will too. Thus we don't need to propagate individual errors.
     82   if (response.has_update_signals_response())
     83     LogIfErrorStatus(response.update_signals_response().status(), "Update");
     84   if (response.has_manage_messages_response())
     85     LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
     86   if (response.has_manage_subscriptions_response()) {
     87     LogIfErrorStatus(response.manage_subscriptions_response().status(),
     88                      "Subscribe");
     89   }
     90 
     91   return result;
     92 }
     93 
     94 // Request construction
     95 // TODO(ckehoe): Move these into a separate file?
     96 
     97 template <typename T>
     98 BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
     99   if (msg.has_token_exchange_strategy() &&
    100       msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
    101     return msg.token_exchange_strategy().broadcast_scan_configuration();
    102   }
    103   return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
    104 }
    105 
    106 scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
    107   scoped_ptr<DeviceState> state(new DeviceState);
    108 
    109   TokenTechnology* ultrasound =
    110       state->mutable_capabilities()->add_token_technology();
    111   ultrasound->set_medium(AUDIO_ULTRASOUND_PASSBAND);
    112   ultrasound->add_instruction_type(TRANSMIT);
    113   ultrasound->add_instruction_type(RECEIVE);
    114 
    115   TokenTechnology* audible =
    116       state->mutable_capabilities()->add_token_technology();
    117   audible->set_medium(AUDIO_AUDIBLE_DTMF);
    118   audible->add_instruction_type(TRANSMIT);
    119   audible->add_instruction_type(RECEIVE);
    120 
    121   return state.Pass();
    122 }
    123 
    124 // TODO(ckehoe): We're keeping this code in a separate function for now
    125 // because we get a version string from Chrome, but the proto expects
    126 // an int64 version. We should probably change the version proto
    127 // to handle a more detailed version.
    128 ClientVersion* CreateVersion(const std::string& client,
    129                              const std::string& version_name) {
    130   ClientVersion* version = new ClientVersion;
    131 
    132   version->set_client(client);
    133   version->set_version_name(version_name);
    134 
    135   return version;
    136 }
    137 
    138 void AddTokenToRequest(ReportRequest* request, const AudioToken& token) {
    139   TokenObservation* token_observation =
    140       request->mutable_update_signals_request()->add_token_observation();
    141   token_observation->set_token_id(ToUrlSafe(token.token));
    142 
    143   TokenSignals* signals = token_observation->add_signals();
    144   signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
    145                                     : AUDIO_ULTRASOUND_PASSBAND);
    146   signals->set_observed_time_millis(base::Time::Now().ToJsTime());
    147 }
    148 
    149 }  // namespace
    150 
    151 // Public methods
    152 
    153 RpcHandler::RpcHandler(CopresenceDelegate* delegate)
    154     : delegate_(delegate),
    155       invalid_audio_token_cache_(
    156           base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
    157           kMaxInvalidTokens),
    158       server_post_callback_(base::Bind(&RpcHandler::SendHttpPost,
    159                                        base::Unretained(this))) {}
    160 
    161 RpcHandler::~RpcHandler() {
    162   for (std::set<HttpPost*>::iterator post = pending_posts_.begin();
    163        post != pending_posts_.end(); ++post) {
    164     delete *post;
    165   }
    166 
    167   if (delegate_ && delegate_->GetWhispernetClient()) {
    168     delegate_->GetWhispernetClient()->RegisterTokensCallback(
    169         WhispernetClient::TokensCallback());
    170   }
    171 }
    172 
    173 void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
    174   scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
    175   DCHECK(device_id_.empty());
    176 
    177   request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
    178   Identity* identity =
    179       request->mutable_device_identifiers()->mutable_registrant();
    180   identity->set_type(CHROME);
    181   identity->set_chrome_id(base::GenerateGUID());
    182   SendServerRequest(
    183       kRegisterDeviceRpcName,
    184       std::string(),
    185       request.Pass(),
    186       base::Bind(&RpcHandler::RegisterResponseHandler,
    187                  // On destruction, this request will be cancelled.
    188                  base::Unretained(this),
    189                  init_done_callback));
    190 }
    191 
    192 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
    193   SendReportRequest(request.Pass(), std::string(), StatusCallback());
    194 }
    195 
    196 void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
    197                                    const std::string& app_id,
    198                                    const StatusCallback& status_callback) {
    199   DCHECK(request.get());
    200   DCHECK(!device_id_.empty())
    201       << "RpcHandler::Initialize() must complete successfully "
    202       << "before other RpcHandler methods are called.";
    203 
    204   DVLOG(3) << "Sending report request to server.";
    205 
    206   // If we are unpublishing or unsubscribing, we need to stop those publish or
    207   // subscribes right away, we don't need to wait for the server to tell us.
    208   ProcessRemovedOperations(*request);
    209 
    210   request->mutable_update_signals_request()->set_allocated_state(
    211       GetDeviceCapabilities(*request).release());
    212 
    213   AddPlayingTokens(request.get());
    214 
    215   SendServerRequest(kReportRequestRpcName,
    216                     app_id,
    217                     request.Pass(),
    218                     // On destruction, this request will be cancelled.
    219                     base::Bind(&RpcHandler::ReportResponseHandler,
    220                                base::Unretained(this),
    221                                status_callback));
    222 }
    223 
    224 void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
    225   DCHECK(!tokens.empty());
    226 
    227   scoped_ptr<ReportRequest> request(new ReportRequest);
    228   for (size_t i = 0; i < tokens.size(); ++i) {
    229     if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token)))
    230       continue;
    231     DVLOG(3) << "Sending token " << tokens[i].token << " to server.";
    232     AddTokenToRequest(request.get(), tokens[i]);
    233   }
    234   SendReportRequest(request.Pass());
    235 }
    236 
    237 void RpcHandler::ConnectToWhispernet() {
    238   WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
    239 
    240   // |directive_handler_| will be destructed with us, so unretained is safe.
    241   directive_handler_.reset(new DirectiveHandler);
    242   directive_handler_->Initialize(
    243       base::Bind(&WhispernetClient::DecodeSamples,
    244                  base::Unretained(whispernet_client)),
    245       base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
    246                  base::Unretained(this)));
    247 
    248   whispernet_client->RegisterTokensCallback(
    249       base::Bind(&RpcHandler::ReportTokens,
    250                  // On destruction, this callback will be disconnected.
    251                  base::Unretained(this)));
    252 }
    253 
    254 // Private methods
    255 
    256 void RpcHandler::RegisterResponseHandler(
    257     const SuccessCallback& init_done_callback,
    258     HttpPost* completed_post,
    259     int http_status_code,
    260     const std::string& response_data) {
    261   if (completed_post) {
    262     int elements_erased = pending_posts_.erase(completed_post);
    263     DCHECK(elements_erased);
    264     delete completed_post;
    265   }
    266 
    267   if (http_status_code != net::HTTP_OK) {
    268     init_done_callback.Run(false);
    269     return;
    270   }
    271 
    272   RegisterDeviceResponse response;
    273   if (!response.ParseFromString(response_data)) {
    274     LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
    275     init_done_callback.Run(false);
    276     return;
    277   }
    278 
    279   if (CopresenceErrorLogged(response.header().status()))
    280     return;
    281   device_id_ = response.registered_device_id();
    282   DCHECK(!device_id_.empty());
    283   DVLOG(2) << "Device registration successful: id " << device_id_;
    284   init_done_callback.Run(true);
    285 }
    286 
    287 void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
    288                                        HttpPost* completed_post,
    289                                        int http_status_code,
    290                                        const std::string& response_data) {
    291   if (completed_post) {
    292     int elements_erased = pending_posts_.erase(completed_post);
    293     DCHECK(elements_erased);
    294     delete completed_post;
    295   }
    296 
    297   if (http_status_code != net::HTTP_OK) {
    298     if (!status_callback.is_null())
    299       status_callback.Run(FAIL);
    300     return;
    301   }
    302 
    303   DVLOG(3) << "Received ReportResponse.";
    304   ReportResponse response;
    305   if (!response.ParseFromString(response_data)) {
    306     LOG(ERROR) << "Invalid ReportResponse";
    307     if (!status_callback.is_null())
    308       status_callback.Run(FAIL);
    309     return;
    310   }
    311 
    312   if (ReportErrorLogged(response)) {
    313     if (!status_callback.is_null())
    314       status_callback.Run(FAIL);
    315     return;
    316   }
    317 
    318   const RepeatedPtrField<MessageResult>& message_results =
    319       response.manage_messages_response().published_message_result();
    320   for (int i = 0; i < message_results.size(); ++i) {
    321     DVLOG(2) << "Published message with id "
    322              << message_results.Get(i).published_message_id();
    323   }
    324 
    325   const RepeatedPtrField<SubscriptionResult>& subscription_results =
    326       response.manage_subscriptions_response().subscription_result();
    327   for (int i = 0; i < subscription_results.size(); ++i) {
    328     DVLOG(2) << "Created subscription with id "
    329              << subscription_results.Get(i).subscription_id();
    330   }
    331 
    332   if (response.has_update_signals_response()) {
    333     const UpdateSignalsResponse& update_response =
    334         response.update_signals_response();
    335     DispatchMessages(update_response.message());
    336 
    337     if (directive_handler_.get()) {
    338       for (int i = 0; i < update_response.directive_size(); ++i)
    339         directive_handler_->AddDirective(update_response.directive(i));
    340     } else {
    341       DVLOG(1) << "No directive handler.";
    342     }
    343 
    344     const RepeatedPtrField<Token>& tokens = update_response.token();
    345     for (int i = 0; i < tokens.size(); ++i) {
    346       switch (tokens.Get(i).status()) {
    347         case VALID:
    348           // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
    349           // short TTL (like 10s) and send it up with every report request.
    350           // Then we'll still get messages while we're waiting to hear it again.
    351           VLOG(1) << "Got valid token " << tokens.Get(i).id();
    352           break;
    353         case INVALID:
    354           DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
    355           invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
    356           break;
    357         default:
    358           DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
    359                    << tokens.Get(i).status();
    360       }
    361     }
    362   }
    363 
    364   // TODO(ckehoe): Return a more detailed status response.
    365   if (!status_callback.is_null())
    366     status_callback.Run(SUCCESS);
    367 }
    368 
    369 void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
    370   // Remove unpublishes.
    371   if (request.has_manage_messages_request()) {
    372     const RepeatedPtrField<std::string>& unpublishes =
    373         request.manage_messages_request().id_to_unpublish();
    374     for (int i = 0; i < unpublishes.size(); ++i)
    375       directive_handler_->RemoveDirectives(unpublishes.Get(i));
    376   }
    377 
    378   // Remove unsubscribes.
    379   if (request.has_manage_subscriptions_request()) {
    380     const RepeatedPtrField<std::string>& unsubscribes =
    381         request.manage_subscriptions_request().id_to_unsubscribe();
    382     for (int i = 0; i < unsubscribes.size(); ++i)
    383       directive_handler_->RemoveDirectives(unsubscribes.Get(i));
    384   }
    385 }
    386 
    387 void RpcHandler::AddPlayingTokens(ReportRequest* request) {
    388   if (!directive_handler_)
    389     return;
    390 
    391   const std::string& audible_token = directive_handler_->CurrentAudibleToken();
    392   const std::string& inaudible_token =
    393       directive_handler_->CurrentInaudibleToken();
    394 
    395   if (!audible_token.empty())
    396     AddTokenToRequest(request, AudioToken(audible_token, true));
    397   if (!inaudible_token.empty())
    398     AddTokenToRequest(request, AudioToken(inaudible_token, false));
    399 }
    400 
    401 void RpcHandler::DispatchMessages(
    402     const RepeatedPtrField<SubscribedMessage>& messages) {
    403   if (messages.size() == 0)
    404     return;
    405 
    406   // Index the messages by subscription id.
    407   std::map<std::string, std::vector<Message>> messages_by_subscription;
    408   DVLOG(3) << "Dispatching " << messages.size() << " messages";
    409   for (int m = 0; m < messages.size(); ++m) {
    410     const RepeatedPtrField<std::string>& subscription_ids =
    411         messages.Get(m).subscription_id();
    412     for (int s = 0; s < subscription_ids.size(); ++s) {
    413       messages_by_subscription[subscription_ids.Get(s)].push_back(
    414           messages.Get(m).published_message());
    415     }
    416   }
    417 
    418   // Send the messages for each subscription.
    419   for (std::map<std::string, std::vector<Message>>::const_iterator
    420            subscription = messages_by_subscription.begin();
    421        subscription != messages_by_subscription.end();
    422        ++subscription) {
    423     // TODO(ckehoe): Once we have the app ID from the server, we need to pass
    424     // it in here and get rid of the app id registry from the main API class.
    425     delegate_->HandleMessages("", subscription->first, subscription->second);
    426   }
    427 }
    428 
    429 RequestHeader* RpcHandler::CreateRequestHeader(
    430     const std::string& client_name) const {
    431   RequestHeader* header = new RequestHeader;
    432 
    433   header->set_allocated_framework_version(CreateVersion(
    434       "Chrome", delegate_->GetPlatformVersionString()));
    435   if (!client_name.empty()) {
    436     header->set_allocated_client_version(
    437         CreateVersion(client_name, std::string()));
    438   }
    439   header->set_current_time_millis(base::Time::Now().ToJsTime());
    440   header->set_registered_device_id(device_id_);
    441 
    442   DeviceFingerprint* fingerprint = new DeviceFingerprint;
    443   fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
    444   fingerprint->set_type(CHROME_PLATFORM_TYPE);
    445   header->set_allocated_device_fingerprint(fingerprint);
    446 
    447   return header;
    448 }
    449 
    450 template <class T>
    451 void RpcHandler::SendServerRequest(
    452     const std::string& rpc_name,
    453     const std::string& app_id,
    454     scoped_ptr<T> request,
    455     const PostCleanupCallback& response_handler) {
    456   request->set_allocated_header(CreateRequestHeader(app_id));
    457   server_post_callback_.Run(delegate_->GetRequestContext(),
    458                             rpc_name,
    459                             make_scoped_ptr<MessageLite>(request.release()),
    460                             response_handler);
    461 }
    462 
    463 void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
    464                               const std::string& rpc_name,
    465                               scoped_ptr<MessageLite> request_proto,
    466                               const PostCleanupCallback& callback) {
    467   // Create the base URL to call.
    468   CommandLine* command_line = CommandLine::ForCurrentProcess();
    469   const std::string copresence_server_host =
    470       command_line->HasSwitch(switches::kCopresenceServer) ?
    471       command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
    472       kDefaultCopresenceServer;
    473 
    474   // Create the request and keep a pointer until it completes.
    475   HttpPost* http_post = new HttpPost(
    476       url_context_getter,
    477       copresence_server_host,
    478       rpc_name,
    479       command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
    480       delegate_->GetAPIKey(),
    481       *request_proto);
    482 
    483   http_post->Start(base::Bind(callback, http_post));
    484   pending_posts_.insert(http_post);
    485 }
    486 
    487 void RpcHandler::AudioDirectiveListToWhispernetConnector(
    488     const std::string& token,
    489     bool audible,
    490     const WhispernetClient::SamplesCallback& samples_callback) {
    491   WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
    492   if (whispernet_client) {
    493     whispernet_client->RegisterSamplesCallback(samples_callback);
    494     whispernet_client->EncodeToken(token, audible);
    495   }
    496 }
    497 
    498 }  // namespace copresence
    499