Home | History | Annotate | Download | only in notifier
      1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
      6 
      7 #include <string>
      8 
      9 #include "base/base64.h"
     10 #include "base/callback.h"
     11 #include "base/compiler_specific.h"
     12 #include "base/logging.h"
     13 #include "base/rand_util.h"
     14 #include "base/string_number_conversions.h"
     15 #include "google/cacheinvalidation/invalidation-client.h"
     16 #include "jingle/notifier/listener/xml_element_util.h"
     17 #include "talk/xmpp/constants.h"
     18 #include "talk/xmpp/jid.h"
     19 #include "talk/xmpp/xmppclient.h"
     20 #include "talk/xmpp/xmpptask.h"
     21 
     22 namespace sync_notifier {
     23 
     24 namespace {
     25 
     26 const char kBotJid[] = "tango (at) bot.talk.google.com";
     27 const char kServiceUrl[] = "http://www.google.com/chrome/sync";
     28 
     29 const buzz::QName kQnData("google:notifier", "data");
     30 const buzz::QName kQnSeq("", "seq");
     31 const buzz::QName kQnSid("", "sid");
     32 const buzz::QName kQnServiceUrl("", "serviceUrl");
     33 
     34 // TODO(akalin): Move these task classes out so that they can be
     35 // unit-tested.  This'll probably be done easier once we consolidate
     36 // all the packet sending/receiving classes.
     37 
     38 // A task that listens for ClientInvalidation messages and calls the
     39 // given callback on them.
     40 class CacheInvalidationListenTask : public buzz::XmppTask {
     41  public:
     42   // Takes ownership of callback.
     43   CacheInvalidationListenTask(Task* parent,
     44                               Callback1<const std::string&>::Type* callback)
     45       : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
     46   virtual ~CacheInvalidationListenTask() {}
     47 
     48   virtual int ProcessStart() {
     49     VLOG(2) << "CacheInvalidationListenTask started";
     50     return STATE_RESPONSE;
     51   }
     52 
     53   virtual int ProcessResponse() {
     54     const buzz::XmlElement* stanza = NextStanza();
     55     if (stanza == NULL) {
     56       VLOG(2) << "CacheInvalidationListenTask blocked";
     57       return STATE_BLOCKED;
     58     }
     59     VLOG(2) << "CacheInvalidationListenTask response received";
     60     std::string data;
     61     if (GetCacheInvalidationIqPacketData(stanza, &data)) {
     62       callback_->Run(data);
     63     } else {
     64       LOG(ERROR) << "Could not get packet data";
     65     }
     66     // Acknowledge receipt of the iq to the buzz server.
     67     // TODO(akalin): Send an error response for malformed packets.
     68     scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
     69     SendStanza(response_stanza.get());
     70     return STATE_RESPONSE;
     71   }
     72 
     73   virtual bool HandleStanza(const buzz::XmlElement* stanza) {
     74     VLOG(1) << "Stanza received: "
     75               << notifier::XmlElementToString(*stanza);
     76     if (IsValidCacheInvalidationIqPacket(stanza)) {
     77       VLOG(2) << "Queueing stanza";
     78       QueueStanza(stanza);
     79       return true;
     80     }
     81     VLOG(2) << "Stanza skipped";
     82     return false;
     83   }
     84 
     85  private:
     86   bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) {
     87     // We deliberately minimize the verification we do here: see
     88     // http://crbug.com/71285 .
     89     return MatchRequestIq(stanza, buzz::STR_SET, kQnData);
     90   }
     91 
     92   bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza,
     93                             std::string* data) {
     94     DCHECK(IsValidCacheInvalidationIqPacket(stanza));
     95     const buzz::XmlElement* cache_invalidation_iq_packet =
     96         stanza->FirstNamed(kQnData);
     97     if (!cache_invalidation_iq_packet) {
     98       LOG(ERROR) << "Could not find cache invalidation IQ packet element";
     99       return false;
    100     }
    101     *data = cache_invalidation_iq_packet->BodyText();
    102     return true;
    103   }
    104 
    105   scoped_ptr<Callback1<const std::string&>::Type> callback_;
    106   DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
    107 };
    108 
    109 // A task that sends a single outbound ClientInvalidation message.
    110 class CacheInvalidationSendMessageTask : public buzz::XmppTask {
    111  public:
    112   CacheInvalidationSendMessageTask(Task* parent,
    113                                    const buzz::Jid& to_jid,
    114                                    const std::string& msg,
    115                                    int seq,
    116                                    const std::string& sid)
    117       : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
    118         to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid) {}
    119   virtual ~CacheInvalidationSendMessageTask() {}
    120 
    121   virtual int ProcessStart() {
    122     scoped_ptr<buzz::XmlElement> stanza(
    123         MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_,
    124                                       seq_, sid_));
    125     VLOG(1) << "Sending message: "
    126               << notifier::XmlElementToString(*stanza.get());
    127     if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
    128       VLOG(2) << "Error when sending message";
    129       return STATE_ERROR;
    130     }
    131     return STATE_RESPONSE;
    132   }
    133 
    134   virtual int ProcessResponse() {
    135     const buzz::XmlElement* stanza = NextStanza();
    136     if (stanza == NULL) {
    137       VLOG(2) << "CacheInvalidationSendMessageTask blocked...";
    138       return STATE_BLOCKED;
    139     }
    140     VLOG(2) << "CacheInvalidationSendMessageTask response received: "
    141               << notifier::XmlElementToString(*stanza);
    142     // TODO(akalin): Handle errors here.
    143     return STATE_DONE;
    144   }
    145 
    146   virtual bool HandleStanza(const buzz::XmlElement* stanza) {
    147     VLOG(1) << "Stanza received: "
    148               << notifier::XmlElementToString(*stanza);
    149     if (!MatchResponseIq(stanza, to_jid_, task_id())) {
    150       VLOG(2) << "Stanza skipped";
    151       return false;
    152     }
    153     VLOG(2) << "Queueing stanza";
    154     QueueStanza(stanza);
    155     return true;
    156   }
    157 
    158  private:
    159   static buzz::XmlElement* MakeCacheInvalidationIqPacket(
    160       const buzz::Jid& to_jid,
    161       const std::string& task_id,
    162       const std::string& msg,
    163       int seq, const std::string& sid) {
    164     buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
    165     buzz::XmlElement* cache_invalidation_iq_packet =
    166         new buzz::XmlElement(kQnData, true);
    167     iq->AddElement(cache_invalidation_iq_packet);
    168     cache_invalidation_iq_packet->SetAttr(kQnSeq, base::IntToString(seq));
    169     cache_invalidation_iq_packet->SetAttr(kQnSid, sid);
    170     cache_invalidation_iq_packet->SetAttr(kQnServiceUrl, kServiceUrl);
    171     cache_invalidation_iq_packet->SetBodyText(msg);
    172     return iq;
    173   }
    174 
    175   const buzz::Jid to_jid_;
    176   std::string msg_;
    177   int seq_;
    178   std::string sid_;
    179 
    180   DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
    181 };
    182 
    183 std::string MakeSid() {
    184   uint64 sid = base::RandUint64();
    185   return std::string("chrome-sync-") + base::Uint64ToString(sid);
    186 }
    187 
    188 }  // namespace
    189 
    190 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
    191     base::WeakPtr<talk_base::Task> base_task,
    192     invalidation::InvalidationClient* invalidation_client)
    193     : scoped_callback_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
    194       base_task_(base_task),
    195       invalidation_client_(invalidation_client),
    196       seq_(0),
    197       sid_(MakeSid()) {
    198   CHECK(base_task_.get());
    199   // Owned by base_task.  Takes ownership of the callback.
    200   CacheInvalidationListenTask* listen_task =
    201       new CacheInvalidationListenTask(
    202           base_task_, scoped_callback_factory_.NewCallback(
    203               &CacheInvalidationPacketHandler::HandleInboundPacket));
    204   listen_task->Start();
    205 }
    206 
    207 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
    208   DCHECK(non_thread_safe_.CalledOnValidThread());
    209 }
    210 
    211 void CacheInvalidationPacketHandler::HandleOutboundPacket(
    212     invalidation::NetworkEndpoint* network_endpoint) {
    213   DCHECK(non_thread_safe_.CalledOnValidThread());
    214   if (!base_task_.get()) {
    215     return;
    216   }
    217   CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint());
    218   invalidation::string message;
    219   network_endpoint->TakeOutboundMessage(&message);
    220   std::string encoded_message;
    221   if (!base::Base64Encode(message, &encoded_message)) {
    222     LOG(ERROR) << "Could not base64-encode message to send: "
    223                << message;
    224     return;
    225   }
    226   // Owned by base_task_.
    227   CacheInvalidationSendMessageTask* send_message_task =
    228       new CacheInvalidationSendMessageTask(base_task_,
    229                                            buzz::Jid(kBotJid),
    230                                            encoded_message,
    231                                            seq_, sid_);
    232   send_message_task->Start();
    233   ++seq_;
    234 }
    235 
    236 void CacheInvalidationPacketHandler::HandleInboundPacket(
    237     const std::string& packet) {
    238   DCHECK(non_thread_safe_.CalledOnValidThread());
    239   invalidation::NetworkEndpoint* network_endpoint =
    240       invalidation_client_->network_endpoint();
    241   std::string decoded_message;
    242   if (!base::Base64Decode(packet, &decoded_message)) {
    243     LOG(ERROR) << "Could not base64-decode received message: "
    244                << packet;
    245     return;
    246   }
    247   network_endpoint->HandleInboundMessage(decoded_message);
    248 }
    249 
    250 }  // namespace sync_notifier
    251