1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h" 6 7 #include <string> 8 9 #include "base/base64.h" 10 #include "base/callback.h" 11 #include "base/compiler_specific.h" 12 #include "base/logging.h" 13 #include "base/rand_util.h" 14 #include "base/string_number_conversions.h" 15 #include "google/cacheinvalidation/invalidation-client.h" 16 #include "jingle/notifier/listener/xml_element_util.h" 17 #include "talk/xmpp/constants.h" 18 #include "talk/xmpp/jid.h" 19 #include "talk/xmpp/xmppclient.h" 20 #include "talk/xmpp/xmpptask.h" 21 22 namespace sync_notifier { 23 24 namespace { 25 26 const char kBotJid[] = "tango (at) bot.talk.google.com"; 27 const char kServiceUrl[] = "http://www.google.com/chrome/sync"; 28 29 const buzz::QName kQnData("google:notifier", "data"); 30 const buzz::QName kQnSeq("", "seq"); 31 const buzz::QName kQnSid("", "sid"); 32 const buzz::QName kQnServiceUrl("", "serviceUrl"); 33 34 // TODO(akalin): Move these task classes out so that they can be 35 // unit-tested. This'll probably be done easier once we consolidate 36 // all the packet sending/receiving classes. 37 38 // A task that listens for ClientInvalidation messages and calls the 39 // given callback on them. 40 class CacheInvalidationListenTask : public buzz::XmppTask { 41 public: 42 // Takes ownership of callback. 43 CacheInvalidationListenTask(Task* parent, 44 Callback1<const std::string&>::Type* callback) 45 : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {} 46 virtual ~CacheInvalidationListenTask() {} 47 48 virtual int ProcessStart() { 49 VLOG(2) << "CacheInvalidationListenTask started"; 50 return STATE_RESPONSE; 51 } 52 53 virtual int ProcessResponse() { 54 const buzz::XmlElement* stanza = NextStanza(); 55 if (stanza == NULL) { 56 VLOG(2) << "CacheInvalidationListenTask blocked"; 57 return STATE_BLOCKED; 58 } 59 VLOG(2) << "CacheInvalidationListenTask response received"; 60 std::string data; 61 if (GetCacheInvalidationIqPacketData(stanza, &data)) { 62 callback_->Run(data); 63 } else { 64 LOG(ERROR) << "Could not get packet data"; 65 } 66 // Acknowledge receipt of the iq to the buzz server. 67 // TODO(akalin): Send an error response for malformed packets. 68 scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza)); 69 SendStanza(response_stanza.get()); 70 return STATE_RESPONSE; 71 } 72 73 virtual bool HandleStanza(const buzz::XmlElement* stanza) { 74 VLOG(1) << "Stanza received: " 75 << notifier::XmlElementToString(*stanza); 76 if (IsValidCacheInvalidationIqPacket(stanza)) { 77 VLOG(2) << "Queueing stanza"; 78 QueueStanza(stanza); 79 return true; 80 } 81 VLOG(2) << "Stanza skipped"; 82 return false; 83 } 84 85 private: 86 bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) { 87 // We deliberately minimize the verification we do here: see 88 // http://crbug.com/71285 . 89 return MatchRequestIq(stanza, buzz::STR_SET, kQnData); 90 } 91 92 bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza, 93 std::string* data) { 94 DCHECK(IsValidCacheInvalidationIqPacket(stanza)); 95 const buzz::XmlElement* cache_invalidation_iq_packet = 96 stanza->FirstNamed(kQnData); 97 if (!cache_invalidation_iq_packet) { 98 LOG(ERROR) << "Could not find cache invalidation IQ packet element"; 99 return false; 100 } 101 *data = cache_invalidation_iq_packet->BodyText(); 102 return true; 103 } 104 105 scoped_ptr<Callback1<const std::string&>::Type> callback_; 106 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask); 107 }; 108 109 // A task that sends a single outbound ClientInvalidation message. 110 class CacheInvalidationSendMessageTask : public buzz::XmppTask { 111 public: 112 CacheInvalidationSendMessageTask(Task* parent, 113 const buzz::Jid& to_jid, 114 const std::string& msg, 115 int seq, 116 const std::string& sid) 117 : XmppTask(parent, buzz::XmppEngine::HL_SINGLE), 118 to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid) {} 119 virtual ~CacheInvalidationSendMessageTask() {} 120 121 virtual int ProcessStart() { 122 scoped_ptr<buzz::XmlElement> stanza( 123 MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_, 124 seq_, sid_)); 125 VLOG(1) << "Sending message: " 126 << notifier::XmlElementToString(*stanza.get()); 127 if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) { 128 VLOG(2) << "Error when sending message"; 129 return STATE_ERROR; 130 } 131 return STATE_RESPONSE; 132 } 133 134 virtual int ProcessResponse() { 135 const buzz::XmlElement* stanza = NextStanza(); 136 if (stanza == NULL) { 137 VLOG(2) << "CacheInvalidationSendMessageTask blocked..."; 138 return STATE_BLOCKED; 139 } 140 VLOG(2) << "CacheInvalidationSendMessageTask response received: " 141 << notifier::XmlElementToString(*stanza); 142 // TODO(akalin): Handle errors here. 143 return STATE_DONE; 144 } 145 146 virtual bool HandleStanza(const buzz::XmlElement* stanza) { 147 VLOG(1) << "Stanza received: " 148 << notifier::XmlElementToString(*stanza); 149 if (!MatchResponseIq(stanza, to_jid_, task_id())) { 150 VLOG(2) << "Stanza skipped"; 151 return false; 152 } 153 VLOG(2) << "Queueing stanza"; 154 QueueStanza(stanza); 155 return true; 156 } 157 158 private: 159 static buzz::XmlElement* MakeCacheInvalidationIqPacket( 160 const buzz::Jid& to_jid, 161 const std::string& task_id, 162 const std::string& msg, 163 int seq, const std::string& sid) { 164 buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id); 165 buzz::XmlElement* cache_invalidation_iq_packet = 166 new buzz::XmlElement(kQnData, true); 167 iq->AddElement(cache_invalidation_iq_packet); 168 cache_invalidation_iq_packet->SetAttr(kQnSeq, base::IntToString(seq)); 169 cache_invalidation_iq_packet->SetAttr(kQnSid, sid); 170 cache_invalidation_iq_packet->SetAttr(kQnServiceUrl, kServiceUrl); 171 cache_invalidation_iq_packet->SetBodyText(msg); 172 return iq; 173 } 174 175 const buzz::Jid to_jid_; 176 std::string msg_; 177 int seq_; 178 std::string sid_; 179 180 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask); 181 }; 182 183 std::string MakeSid() { 184 uint64 sid = base::RandUint64(); 185 return std::string("chrome-sync-") + base::Uint64ToString(sid); 186 } 187 188 } // namespace 189 190 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler( 191 base::WeakPtr<talk_base::Task> base_task, 192 invalidation::InvalidationClient* invalidation_client) 193 : scoped_callback_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), 194 base_task_(base_task), 195 invalidation_client_(invalidation_client), 196 seq_(0), 197 sid_(MakeSid()) { 198 CHECK(base_task_.get()); 199 // Owned by base_task. Takes ownership of the callback. 200 CacheInvalidationListenTask* listen_task = 201 new CacheInvalidationListenTask( 202 base_task_, scoped_callback_factory_.NewCallback( 203 &CacheInvalidationPacketHandler::HandleInboundPacket)); 204 listen_task->Start(); 205 } 206 207 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() { 208 DCHECK(non_thread_safe_.CalledOnValidThread()); 209 } 210 211 void CacheInvalidationPacketHandler::HandleOutboundPacket( 212 invalidation::NetworkEndpoint* network_endpoint) { 213 DCHECK(non_thread_safe_.CalledOnValidThread()); 214 if (!base_task_.get()) { 215 return; 216 } 217 CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint()); 218 invalidation::string message; 219 network_endpoint->TakeOutboundMessage(&message); 220 std::string encoded_message; 221 if (!base::Base64Encode(message, &encoded_message)) { 222 LOG(ERROR) << "Could not base64-encode message to send: " 223 << message; 224 return; 225 } 226 // Owned by base_task_. 227 CacheInvalidationSendMessageTask* send_message_task = 228 new CacheInvalidationSendMessageTask(base_task_, 229 buzz::Jid(kBotJid), 230 encoded_message, 231 seq_, sid_); 232 send_message_task->Start(); 233 ++seq_; 234 } 235 236 void CacheInvalidationPacketHandler::HandleInboundPacket( 237 const std::string& packet) { 238 DCHECK(non_thread_safe_.CalledOnValidThread()); 239 invalidation::NetworkEndpoint* network_endpoint = 240 invalidation_client_->network_endpoint(); 241 std::string decoded_message; 242 if (!base::Base64Decode(packet, &decoded_message)) { 243 LOG(ERROR) << "Could not base64-decode received message: " 244 << packet; 245 return; 246 } 247 network_endpoint->HandleInboundMessage(decoded_message); 248 } 249 250 } // namespace sync_notifier 251