Home | History | Annotate | Download | only in notification
      1 // Copyright 2015 The Weave Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "src/notification/xmpp_channel.h"
      6 
      7 #include <string>
      8 
      9 #include <base/bind.h>
     10 #include <base/strings/string_number_conversions.h>
     11 #include <weave/provider/network.h>
     12 #include <weave/provider/task_runner.h>
     13 
     14 #include "src/backoff_entry.h"
     15 #include "src/data_encoding.h"
     16 #include "src/notification/notification_delegate.h"
     17 #include "src/notification/notification_parser.h"
     18 #include "src/notification/xml_node.h"
     19 #include "src/privet/openssl_utils.h"
     20 #include "src/string_utils.h"
     21 #include "src/utils.h"
     22 
     23 namespace weave {
     24 
     25 namespace {
     26 
     27 std::string BuildXmppStartStreamCommand() {
     28   return "<stream:stream to='clouddevices.gserviceaccount.com' "
     29          "xmlns:stream='http://etherx.jabber.org/streams' "
     30          "xml:lang='*' version='1.0' xmlns='jabber:client'>";
     31 }
     32 
     33 std::string BuildXmppAuthenticateCommand(const std::string& account,
     34                                          const std::string& token) {
     35   std::vector<uint8_t> credentials;
     36   credentials.push_back(0);
     37   credentials.insert(credentials.end(), account.begin(), account.end());
     38   credentials.push_back(0);
     39   credentials.insert(credentials.end(), token.begin(), token.end());
     40   std::string msg =
     41       "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' "
     42       "mechanism='X-OAUTH2' auth:service='oauth2' "
     43       "auth:allow-non-google-login='true' "
     44       "auth:client-uses-full-bind-result='true' "
     45       "xmlns:auth='http://www.google.com/talk/protocol/auth'>" +
     46       Base64Encode(credentials) + "</auth>";
     47   return msg;
     48 }
     49 
     50 // Backoff policy.
     51 // Note: In order to ensure a minimum of 20 seconds between server errors,
     52 // we have a 30s +- 10s (33%) jitter initial backoff.
     53 const BackoffEntry::Policy kDefaultBackoffPolicy = {
     54     // Number of initial errors (in sequence) to ignore before applying
     55     // exponential back-off rules.
     56     0,
     57 
     58     // Initial delay for exponential back-off in ms.
     59     30 * 1000,  // 30 seconds.
     60 
     61     // Factor by which the waiting time will be multiplied.
     62     2,
     63 
     64     // Fuzzing percentage. ex: 10% will spread requests randomly
     65     // between 90%-100% of the calculated time.
     66     0.33,  // 33%.
     67 
     68     // Maximum amount of time we are willing to delay our request in ms.
     69     10 * 60 * 1000,  // 10 minutes.
     70 
     71     // Time to keep an entry from being discarded even when it
     72     // has no significant state, -1 to never discard.
     73     -1,
     74 
     75     // Don't use initial delay unless the last request was an error.
     76     false,
     77 };
     78 
     79 // Used for keeping connection alive.
     80 const int kRegularPingIntervalSeconds = 60;
     81 const int kRegularPingTimeoutSeconds = 30;
     82 
     83 // Used for diagnostic when connectivity changed.
     84 const int kAgressivePingIntervalSeconds = 5;
     85 const int kAgressivePingTimeoutSeconds = 10;
     86 
     87 const int kConnectingTimeoutAfterNetChangeSeconds = 30;
     88 
     89 }  // namespace
     90 
     91 XmppChannel::XmppChannel(const std::string& account,
     92                          const std::string& access_token,
     93                          const std::string& xmpp_endpoint,
     94                          provider::TaskRunner* task_runner,
     95                          provider::Network* network)
     96     : account_{account},
     97       access_token_{access_token},
     98       xmpp_endpoint_{xmpp_endpoint},
     99       network_{network},
    100       backoff_entry_{&kDefaultBackoffPolicy},
    101       task_runner_{task_runner},
    102       iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
    103   read_socket_data_.resize(4096);
    104   if (network) {
    105     network->AddConnectionChangedCallback(base::Bind(
    106         &XmppChannel::OnConnectivityChanged, weak_ptr_factory_.GetWeakPtr()));
    107   }
    108 }
    109 
    110 void XmppChannel::OnMessageRead(size_t size, ErrorPtr error) {
    111   read_pending_ = false;
    112   if (error)
    113     return Restart();
    114   std::string msg(read_socket_data_.data(), size);
    115   VLOG(2) << "Received XMPP packet: '" << msg << "'";
    116 
    117   if (!size)
    118     return Restart();
    119 
    120   stream_parser_.ParseData(msg);
    121   WaitForMessage();
    122 }
    123 
    124 void XmppChannel::OnStreamStart(const std::string& node_name,
    125                                 std::map<std::string, std::string> attributes) {
    126   VLOG(2) << "XMPP stream start: " << node_name;
    127 }
    128 
    129 void XmppChannel::OnStreamEnd(const std::string& node_name) {
    130   VLOG(2) << "XMPP stream ended: " << node_name;
    131   Stop();
    132   if (IsConnected()) {
    133     // If we had a fully-established connection, restart it now.
    134     // However, if the connection has never been established yet (e.g.
    135     // authorization failed), do not restart right now. Wait till we get
    136     // new credentials.
    137     task_runner_->PostDelayedTask(
    138         FROM_HERE,
    139         base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()), {});
    140   } else if (delegate_) {
    141     delegate_->OnPermanentFailure();
    142   }
    143 }
    144 
    145 void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) {
    146   // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback
    147   // from expat XML parser and some stanza could cause the XMPP stream to be
    148   // reset and the parser to be re-initialized. We don't want to destroy the
    149   // parser while it is performing a callback invocation.
    150   task_runner_->PostDelayedTask(
    151       FROM_HERE,
    152       base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(),
    153                  base::Passed(std::move(stanza))),
    154       {});
    155 }
    156 
    157 void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
    158   VLOG(2) << "XMPP stanza received: " << stanza->ToString();
    159 
    160   switch (state_) {
    161     case XmppState::kConnected:
    162       if (stanza->name() == "stream:features") {
    163         auto children = stanza->FindChildren("mechanisms/mechanism", false);
    164         for (const auto& child : children) {
    165           if (child->text() == "X-OAUTH2") {
    166             state_ = XmppState::kAuthenticationStarted;
    167             SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
    168             return;
    169           }
    170         }
    171       }
    172       break;
    173     case XmppState::kAuthenticationStarted:
    174       if (stanza->name() == "success") {
    175         state_ = XmppState::kStreamRestartedPostAuthentication;
    176         RestartXmppStream();
    177         return;
    178       } else if (stanza->name() == "failure") {
    179         if (stanza->FindFirstChild("not-authorized", false)) {
    180           state_ = XmppState::kAuthenticationFailed;
    181           return;
    182         }
    183       }
    184       break;
    185     case XmppState::kStreamRestartedPostAuthentication:
    186       if (stanza->name() == "stream:features" &&
    187           stanza->FindFirstChild("bind", false)) {
    188         state_ = XmppState::kBindSent;
    189         iq_stanza_handler_->SendRequest(
    190             "set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>",
    191             base::Bind(&XmppChannel::OnBindCompleted,
    192                        task_ptr_factory_.GetWeakPtr()),
    193             base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
    194         return;
    195       }
    196       break;
    197     default:
    198       if (stanza->name() == "message") {
    199         HandleMessageStanza(std::move(stanza));
    200         return;
    201       } else if (stanza->name() == "iq") {
    202         if (!iq_stanza_handler_->HandleIqStanza(std::move(stanza))) {
    203           LOG(ERROR) << "Failed to handle IQ stanza";
    204           CloseStream();
    205         }
    206         return;
    207       }
    208       LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
    209       return;
    210   }
    211   // Something bad happened. Close the stream and start over.
    212   LOG(ERROR) << "Error condition occurred handling stanza: "
    213              << stanza->ToString() << " in state: " << static_cast<int>(state_);
    214   CloseStream();
    215 }
    216 
    217 void XmppChannel::CloseStream() {
    218   SendMessage("</stream:stream>");
    219 }
    220 
    221 void XmppChannel::OnBindCompleted(std::unique_ptr<XmlNode> reply) {
    222   if (reply->GetAttributeOrEmpty("type") != "result") {
    223     CloseStream();
    224     return;
    225   }
    226   const XmlNode* jid_node = reply->FindFirstChild("bind/jid", false);
    227   if (!jid_node) {
    228     LOG(ERROR) << "XMPP Bind response is missing JID";
    229     CloseStream();
    230     return;
    231   }
    232 
    233   jid_ = jid_node->text();
    234   state_ = XmppState::kSessionStarted;
    235   iq_stanza_handler_->SendRequest(
    236       "set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>",
    237       base::Bind(&XmppChannel::OnSessionEstablished,
    238                  task_ptr_factory_.GetWeakPtr()),
    239       base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
    240 }
    241 
    242 void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) {
    243   if (reply->GetAttributeOrEmpty("type") != "result") {
    244     CloseStream();
    245     return;
    246   }
    247   state_ = XmppState::kSubscribeStarted;
    248   std::string body =
    249       "<subscribe xmlns='google:push'>"
    250       "<item channel='cloud_devices' from=''/></subscribe>";
    251   iq_stanza_handler_->SendRequest(
    252       "set", "", account_, body,
    253       base::Bind(&XmppChannel::OnSubscribed, task_ptr_factory_.GetWeakPtr()),
    254       base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
    255 }
    256 
    257 void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) {
    258   if (reply->GetAttributeOrEmpty("type") != "result") {
    259     CloseStream();
    260     return;
    261   }
    262   state_ = XmppState::kSubscribed;
    263   if (delegate_)
    264     delegate_->OnConnected(GetName());
    265 }
    266 
    267 void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) {
    268   const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true);
    269   if (!node) {
    270     LOG(WARNING) << "XMPP message stanza is missing <push:data> element";
    271     return;
    272   }
    273   std::string data = node->text();
    274   std::string json_data;
    275   if (!Base64Decode(data, &json_data)) {
    276     LOG(WARNING) << "Failed to decode base64-encoded message payload: " << data;
    277     return;
    278   }
    279 
    280   VLOG(2) << "XMPP push notification data: " << json_data;
    281   auto json_dict = LoadJsonDict(json_data, nullptr);
    282   if (json_dict && delegate_)
    283     ParseNotificationJson(*json_dict, delegate_, GetName());
    284 }
    285 
    286 void XmppChannel::CreateSslSocket() {
    287   CHECK(!stream_);
    288   state_ = XmppState::kConnecting;
    289   LOG(INFO) << "Starting XMPP connection to: " << xmpp_endpoint_;
    290 
    291   std::pair<std::string, std::string> host_port =
    292       SplitAtFirst(xmpp_endpoint_, ":", true);
    293   CHECK(!host_port.first.empty());
    294   CHECK(!host_port.second.empty());
    295   uint32_t port = 0;
    296   CHECK(base::StringToUint(host_port.second, &port)) << xmpp_endpoint_;
    297 
    298   network_->OpenSslSocket(host_port.first, port,
    299                           base::Bind(&XmppChannel::OnSslSocketReady,
    300                                      task_ptr_factory_.GetWeakPtr()));
    301 }
    302 
    303 void XmppChannel::OnSslSocketReady(std::unique_ptr<Stream> stream,
    304                                    ErrorPtr error) {
    305   if (error) {
    306     LOG(ERROR) << "TLS handshake failed. Restarting XMPP connection";
    307     backoff_entry_.InformOfRequest(false);
    308 
    309     LOG(INFO) << "Delaying connection to XMPP server for "
    310               << backoff_entry_.GetTimeUntilRelease();
    311     return task_runner_->PostDelayedTask(
    312         FROM_HERE, base::Bind(&XmppChannel::CreateSslSocket,
    313                               task_ptr_factory_.GetWeakPtr()),
    314         backoff_entry_.GetTimeUntilRelease());
    315   }
    316   CHECK(XmppState::kConnecting == state_);
    317   backoff_entry_.InformOfRequest(true);
    318   stream_ = std::move(stream);
    319   state_ = XmppState::kConnected;
    320   RestartXmppStream();
    321   ScheduleRegularPing();
    322 }
    323 
    324 void XmppChannel::SendMessage(const std::string& message) {
    325   CHECK(stream_) << "No XMPP socket stream available";
    326   if (write_pending_) {
    327     queued_write_data_ += message;
    328     return;
    329   }
    330   write_socket_data_ = queued_write_data_ + message;
    331   queued_write_data_.clear();
    332   VLOG(2) << "Sending XMPP message: " << message;
    333 
    334   write_pending_ = true;
    335   stream_->Write(
    336       write_socket_data_.data(), write_socket_data_.size(),
    337       base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr()));
    338 }
    339 
    340 void XmppChannel::OnMessageSent(ErrorPtr error) {
    341   write_pending_ = false;
    342   if (error)
    343     return Restart();
    344   if (queued_write_data_.empty()) {
    345     WaitForMessage();
    346   } else {
    347     SendMessage(std::string{});
    348   }
    349 }
    350 
    351 void XmppChannel::WaitForMessage() {
    352   if (read_pending_ || !stream_)
    353     return;
    354 
    355   read_pending_ = true;
    356   stream_->Read(
    357       read_socket_data_.data(), read_socket_data_.size(),
    358       base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr()));
    359 }
    360 
    361 std::string XmppChannel::GetName() const {
    362   return "xmpp";
    363 }
    364 
    365 bool XmppChannel::IsConnected() const {
    366   return state_ == XmppState::kSubscribed;
    367 }
    368 
    369 void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
    370   // No extra parameters needed for XMPP.
    371 }
    372 
    373 void XmppChannel::Restart() {
    374   LOG(INFO) << "Restarting XMPP";
    375   Stop();
    376   Start(delegate_);
    377 }
    378 
    379 void XmppChannel::Start(NotificationDelegate* delegate) {
    380   CHECK(state_ == XmppState::kNotStarted);
    381   delegate_ = delegate;
    382 
    383   CreateSslSocket();
    384 }
    385 
    386 void XmppChannel::Stop() {
    387   if (IsConnected() && delegate_)
    388     delegate_->OnDisconnected();
    389 
    390   task_ptr_factory_.InvalidateWeakPtrs();
    391   ping_ptr_factory_.InvalidateWeakPtrs();
    392 
    393   stream_.reset();
    394   state_ = XmppState::kNotStarted;
    395 }
    396 
    397 void XmppChannel::RestartXmppStream() {
    398   stream_parser_.Reset();
    399   stream_->CancelPendingOperations();
    400   read_pending_ = false;
    401   write_pending_ = false;
    402   SendMessage(BuildXmppStartStreamCommand());
    403 }
    404 
    405 void XmppChannel::SchedulePing(base::TimeDelta interval,
    406                                base::TimeDelta timeout) {
    407   VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout;
    408   ping_ptr_factory_.InvalidateWeakPtrs();
    409   task_runner_->PostDelayedTask(
    410       FROM_HERE, base::Bind(&XmppChannel::PingServer,
    411                             ping_ptr_factory_.GetWeakPtr(), timeout),
    412       interval);
    413 }
    414 
    415 void XmppChannel::ScheduleRegularPing() {
    416   SchedulePing(base::TimeDelta::FromSeconds(kRegularPingIntervalSeconds),
    417                base::TimeDelta::FromSeconds(kRegularPingTimeoutSeconds));
    418 }
    419 
    420 void XmppChannel::ScheduleFastPing() {
    421   SchedulePing(base::TimeDelta::FromSeconds(kAgressivePingIntervalSeconds),
    422                base::TimeDelta::FromSeconds(kAgressivePingTimeoutSeconds));
    423 }
    424 
    425 void XmppChannel::PingServer(base::TimeDelta timeout) {
    426   VLOG(1) << "Sending XMPP ping";
    427   if (!IsConnected()) {
    428     LOG(WARNING) << "XMPP channel is not connected";
    429     Restart();
    430     return;
    431   }
    432 
    433   // Send an XMPP Ping request as defined in XEP-0199 extension:
    434   // http://xmpp.org/extensions/xep-0199.html
    435   iq_stanza_handler_->SendRequestWithCustomTimeout(
    436       "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>", timeout,
    437       base::Bind(&XmppChannel::OnPingResponse, task_ptr_factory_.GetWeakPtr(),
    438                  base::Time::Now()),
    439       base::Bind(&XmppChannel::OnPingTimeout, task_ptr_factory_.GetWeakPtr(),
    440                  base::Time::Now()));
    441 }
    442 
    443 void XmppChannel::OnPingResponse(base::Time sent_time,
    444                                  std::unique_ptr<XmlNode> reply) {
    445   VLOG(1) << "XMPP response received after " << (base::Time::Now() - sent_time);
    446   // Ping response received from server. Everything seems to be in order.
    447   // Reschedule with default intervals.
    448   ScheduleRegularPing();
    449 }
    450 
    451 void XmppChannel::OnPingTimeout(base::Time sent_time) {
    452   LOG(WARNING) << "XMPP channel seems to be disconnected. Ping timed out after "
    453                << (base::Time::Now() - sent_time);
    454   Restart();
    455 }
    456 
    457 void XmppChannel::OnConnectivityChanged() {
    458   if (state_ == XmppState::kNotStarted)
    459     return;
    460 
    461   if (state_ == XmppState::kConnecting &&
    462       backoff_entry_.GetTimeUntilRelease() <
    463           base::TimeDelta::FromSeconds(
    464               kConnectingTimeoutAfterNetChangeSeconds)) {
    465     VLOG(1) << "Next reconnect in " << backoff_entry_.GetTimeUntilRelease();
    466     return;
    467   }
    468 
    469   ScheduleFastPing();
    470 }
    471 
    472 }  // namespace weave
    473