Home | History | Annotate | Download | only in plus
      1 /*
      2  * libjingle
      3  * Copyright 2006, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #include <iostream>
     29 #include "libjingleplus.h"
     30 #ifdef WIN32
     31 #include "talk/base/win32socketserver.h"
     32 #endif
     33 #include "talk/base/physicalsocketserver.h"
     34 #include "talk/base/logging.h"
     35 #include "talk/examples/login/xmppauth.h"
     36 #include "talk/examples/login/xmppsocket.h"
     37 #include "talk/examples/login/xmpppump.h"
     38 #include "presencepushtask.h"
     39 #include "talk/app/status.h"
     40 #include "talk/app/message.h"
     41 #include "rostertask.h"
     42 #include "talk/app/iqtask.h"
     43 #include "talk/app/presenceouttask.h"
     44 #include "talk/app/receivemessagetask.h"
     45 #include "talk/app/rostersettask.h"
     46 #include "talk/app/sendmessagetask.h"
     47 
     48 enum {
     49   MSG_START,
     50 
     51   // main thread to worker
     52   MSG_LOGIN,
     53   MSG_DISCONNECT,
     54   MSG_SEND_PRESENCE,
     55   MSG_SEND_DIRECTED_PRESENCE,
     56   MSG_SEND_DIRECTED_MUC_PRESENCE,
     57   MSG_SEND_XMPP_MESSAGE,
     58   MSG_SEND_XMPP_IQ,
     59   MSG_UPDATE_ROSTER_ITEM,
     60   MSG_REMOVE_ROSTER_ITEM,
     61 
     62   // worker thread to main thread
     63   MSG_STATE_CHANGE,
     64   MSG_STATUS_UPDATE,
     65   MSG_STATUS_ERROR,
     66   MSG_ROSTER_REFRESH_STARTED,
     67   MSG_ROSTER_REFRESH_FINISHED,
     68   MSG_ROSTER_ITEM_UPDATED,
     69   MSG_ROSTER_ITEM_REMOVED,
     70   MSG_ROSTER_SUBSCRIBE,
     71   MSG_ROSTER_UNSUBSCRIBE,
     72   MSG_ROSTER_SUBSCRIBED,
     73   MSG_ROSTER_UNSUBSCRIBED,
     74   MSG_INCOMING_MESSAGE,
     75   MSG_IQ_COMPLETE,
     76   MSG_XMPP_INPUT,
     77   MSG_XMPP_OUTPUT
     78 };
     79 
     80 class LibjinglePlusWorker : public talk_base::MessageHandler,
     81 			    public XmppPumpNotify,
     82                             public sigslot::has_slots<> {
     83  public:
     84   LibjinglePlusWorker(LibjinglePlus *ljp, LibjinglePlusNotify *notify) :
     85     worker_thread_(NULL), ljp_(ljp), notify_(notify),
     86     ppt_(NULL), rmt_(NULL), rt_(NULL), is_test_login_(false) {
     87 
     88     main_thread_.reset(new talk_base::AutoThread());
     89 #ifdef WIN32
     90     ss_.reset(new talk_base::Win32SocketServer(main_thread_.get()));
     91     main_thread_->set_socketserver(ss_.get());
     92 #endif
     93 
     94     pump_.reset(new XmppPump(this));
     95 
     96     pump_->client()->SignalLogInput.connect(this, &LibjinglePlusWorker::OnInputDebug);
     97     pump_->client()->SignalLogOutput.connect(this, &LibjinglePlusWorker::OnOutputDebug);
     98     //pump_->client()->SignalStateChange.connect(this, &LibjinglePlusWorker::OnStateChange);
     99     }
    100 
    101   ~LibjinglePlusWorker() {
    102     if (worker_thread_) {
    103       worker_thread_->Send(this, MSG_DISCONNECT);
    104       delete worker_thread_;
    105     }
    106   }
    107 
    108   virtual void OnMessage(talk_base::Message *msg) {
    109     switch (msg->message_id) {
    110     case MSG_START:
    111       LoginW();
    112       break;
    113     case MSG_DISCONNECT:
    114       DisconnectW();
    115       break;
    116     case MSG_SEND_XMPP_MESSAGE:
    117       SendXmppMessageW(static_cast<SendMessageData*>(msg->pdata)->m_);
    118       delete msg->pdata;
    119       break;
    120     case MSG_SEND_XMPP_IQ:
    121       SendXmppIqW(static_cast<SendIqData*>(msg->pdata)->to_jid_,
    122                   static_cast<SendIqData*>(msg->pdata)->is_get_,
    123                   static_cast<SendIqData*>(msg->pdata)->xml_element_);
    124       delete msg->pdata;
    125       break;
    126     case MSG_SEND_PRESENCE:
    127       SendPresenceW(static_cast<SendPresenceData*>(msg->pdata)->s_);
    128       delete msg->pdata;
    129       break;
    130     case MSG_SEND_DIRECTED_PRESENCE:
    131       SendDirectedPresenceW(static_cast<SendDirectedPresenceData*>(msg->pdata)->j_,
    132 			    static_cast<SendDirectedPresenceData*>(msg->pdata)->s_);
    133       delete msg->pdata;
    134       break;
    135     case MSG_SEND_DIRECTED_MUC_PRESENCE:
    136       SendDirectedMUCPresenceW(static_cast<SendDirectedMUCPresenceData*>(msg->pdata)->j_,
    137 			       static_cast<SendDirectedMUCPresenceData*>(msg->pdata)->s_,
    138 			       static_cast<SendDirectedMUCPresenceData*>(msg->pdata)->un_,
    139 			       static_cast<SendDirectedMUCPresenceData*>(msg->pdata)->ac_,
    140 			       static_cast<SendDirectedMUCPresenceData*>(msg->pdata)->am_,
    141 			       static_cast<SendDirectedMUCPresenceData*>(msg->pdata)->role_);
    142       delete msg->pdata;
    143       break;
    144     case MSG_UPDATE_ROSTER_ITEM:
    145       UpdateRosterItemW(static_cast<UpdateRosterItemData*>(msg->pdata)->jid_,
    146 			static_cast<UpdateRosterItemData*>(msg->pdata)->n_,
    147 			static_cast<UpdateRosterItemData*>(msg->pdata)->g_,
    148 			static_cast<UpdateRosterItemData*>(msg->pdata)->grt_);
    149       delete msg->pdata;
    150       break;
    151     case MSG_REMOVE_ROSTER_ITEM:
    152       RemoveRosterItemW(static_cast<JidData*>(msg->pdata)->jid_);
    153       delete msg->pdata;
    154       break;
    155 
    156 
    157 
    158 
    159     case MSG_STATUS_UPDATE:
    160       OnStatusUpdateW(static_cast<SendPresenceData*>(msg->pdata)->s_);
    161       delete msg->pdata;
    162       break;
    163     case MSG_STATUS_ERROR:
    164       OnStatusErrorW(static_cast<StatusErrorData*>(msg->pdata)->stanza_);
    165       delete msg->pdata;
    166       break;
    167     case MSG_STATE_CHANGE:
    168       OnStateChangeW(static_cast<StateChangeData*>(msg->pdata)->s_);
    169       delete msg->pdata;
    170       break;
    171     case MSG_ROSTER_REFRESH_STARTED:
    172       OnRosterRefreshStartedW();
    173       break;
    174     case MSG_ROSTER_REFRESH_FINISHED:
    175       OnRosterRefreshFinishedW();
    176       break;
    177     case MSG_ROSTER_ITEM_UPDATED:
    178       OnRosterItemUpdatedW(static_cast<RosterItemData*>(msg->pdata)->ri_);
    179       delete msg->pdata;
    180       break;
    181     case MSG_ROSTER_ITEM_REMOVED:
    182       OnRosterItemRemovedW(static_cast<RosterItemData*>(msg->pdata)->ri_);
    183       delete msg->pdata;
    184       break;
    185     case MSG_ROSTER_SUBSCRIBE:
    186       OnRosterSubscribeW(static_cast<JidData*>(msg->pdata)->jid_);
    187       delete msg->pdata;
    188       break;
    189     case MSG_ROSTER_UNSUBSCRIBE:
    190       OnRosterUnsubscribeW(static_cast<JidData*>(msg->pdata)->jid_);
    191       delete msg->pdata;
    192       break;
    193     case MSG_ROSTER_SUBSCRIBED:
    194       OnRosterSubscribedW(static_cast<JidData*>(msg->pdata)->jid_);
    195       delete msg->pdata;
    196       break;
    197     case MSG_ROSTER_UNSUBSCRIBED:
    198       OnRosterUnsubscribedW(static_cast<JidData*>(msg->pdata)->jid_);
    199       delete msg->pdata;
    200       break;
    201     case MSG_INCOMING_MESSAGE:
    202       OnIncomingMessageW(static_cast<XmppMessageData*>(msg->pdata)->m_);
    203       delete msg->pdata;
    204       break;
    205     case MSG_IQ_COMPLETE:
    206       OnIqCompleteW(static_cast<IqCompleteData*>(msg->pdata)->success_,
    207                     static_cast<IqCompleteData*>(msg->pdata)->stanza_);
    208       delete msg->pdata;
    209       break;
    210     case MSG_XMPP_OUTPUT:
    211       OnOutputDebugW(static_cast<StringData*>(msg->pdata)->s_);
    212       delete msg->pdata;
    213       break;
    214     case MSG_XMPP_INPUT:
    215       OnInputDebugW(static_cast<StringData*>(msg->pdata)->s_);
    216       delete msg->pdata;
    217       break;
    218     }
    219   }
    220 
    221   void Login(const std::string &jid, const std::string &password,
    222 	     const std::string &machine_address, bool is_test, bool cookie_auth) {
    223     is_test_login_ = is_test;
    224 
    225     xcs_.set_user(jid);
    226     if (cookie_auth) {
    227 	    xcs_.set_auth_cookie(password);
    228     } else {
    229 	    talk_base::InsecureCryptStringImpl pass;
    230 	    pass.password() = password;
    231 	    xcs_.set_pass(talk_base::CryptString(pass));
    232     }
    233     xcs_.set_host(is_test ? "google.com" : "gmail.com");
    234     xcs_.set_resource("libjingleplus");
    235     xcs_.set_server(talk_base::SocketAddress(machine_address, 5222));
    236     xcs_.set_use_tls(!is_test);
    237     if (is_test) {
    238       xcs_.set_allow_plain(true);
    239     }
    240 
    241     worker_thread_ = new talk_base::Thread(&pss_);
    242     worker_thread_->Start();
    243     worker_thread_->Send(this, MSG_START);
    244   }
    245 
    246   void SendXmppMessage(const buzz::XmppMessage &m) {
    247     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    248     worker_thread_->Post(this, MSG_SEND_XMPP_MESSAGE, new SendMessageData(m));
    249   }
    250 
    251   void SendXmppIq(const buzz::Jid &to_jid, bool is_get,
    252                   const buzz::XmlElement *xml_element) {
    253     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    254     worker_thread_->Post(this, MSG_SEND_XMPP_IQ,
    255                          new SendIqData(to_jid, is_get, xml_element));
    256   }
    257 
    258   void SendPresence(const buzz::Status & s) {
    259     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    260     worker_thread_->Post(this, MSG_SEND_PRESENCE, new SendPresenceData(s));
    261   }
    262 
    263   void SendDirectedPresence (const buzz::Jid &j, const buzz::Status &s) {
    264     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    265     worker_thread_->Post(this, MSG_SEND_DIRECTED_PRESENCE, new SendDirectedPresenceData(j,s));
    266   }
    267 
    268   void SendDirectedMUCPresence(const buzz::Jid &j, const buzz::Status &s,
    269 			       const std::string &un, const std::string &ac,
    270 			       const std::string &am, const std::string &role) {
    271     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    272     worker_thread_->Post(this, MSG_SEND_DIRECTED_MUC_PRESENCE, new SendDirectedMUCPresenceData(j,s,un,ac,am, role));
    273   }
    274 
    275   void UpdateRosterItem(const buzz::Jid & jid, const std::string & name,
    276 			const std::vector<std::string> & groups, buzz::GrType grt) {
    277     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    278     worker_thread_->Post(this, MSG_UPDATE_ROSTER_ITEM, new UpdateRosterItemData(jid,name,groups,grt));
    279   }
    280 
    281   void RemoveRosterItemW(const buzz::Jid &jid) {
    282     buzz::RosterSetTask *rst = new buzz::RosterSetTask(pump_.get()->client());
    283     rst->Remove(jid);
    284     rst->Start();
    285   }
    286 
    287   void RemoveRosterItem(const buzz::Jid &jid) {
    288     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    289     worker_thread_->Post(this, MSG_REMOVE_ROSTER_ITEM, new JidData(jid));
    290   }
    291 
    292   void DoCallbacks() {
    293     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    294     talk_base::Message m;
    295     while (main_thread_->Get(&m, 0)) {
    296       main_thread_->Dispatch(&m);
    297     }
    298   }
    299 
    300  private:
    301 
    302   struct UpdateRosterItemData : public talk_base::MessageData {
    303     UpdateRosterItemData(const buzz::Jid &jid, const std::string &name,
    304 			 const std::vector<std::string> &groups, buzz::GrType grt) :
    305       jid_(jid), n_(name), g_(groups), grt_(grt) {}
    306     buzz::Jid jid_;
    307     std::string n_;
    308     std::vector<std::string> g_;
    309     buzz::GrType grt_;
    310   };
    311 
    312   void UpdateRosterItemW(const buzz::Jid &jid, const std::string &name,
    313 			 const std::vector<std::string> &groups, buzz::GrType grt) {
    314     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    315     buzz::RosterSetTask *rst = new buzz::RosterSetTask(pump_.get()->client());
    316     rst->Update(jid, name, groups, grt);
    317     rst->Start();
    318   }
    319 
    320   struct StringData : public talk_base::MessageData {
    321     StringData(std::string s) : s_(s) {}
    322     std::string s_;
    323   };
    324 
    325   void OnInputDebugW(const std::string &data) {
    326     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    327     if (notify_)
    328       notify_->OnXmppInput(data);
    329   }
    330 
    331   void OnInputDebug(const char *data, int len) {
    332     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    333     main_thread_->Post(this, MSG_XMPP_INPUT, new StringData(std::string(data,len)));
    334     if (notify_)
    335       notify_->WakeupMainThread();
    336   }
    337 
    338   void OnOutputDebugW(const std::string &data) {
    339     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    340     if (notify_)
    341       notify_->OnXmppOutput(data);
    342   }
    343 
    344   void OnOutputDebug(const char *data, int len) {
    345     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    346     main_thread_->Post(this, MSG_XMPP_OUTPUT, new StringData(std::string(data,len)));
    347     if (notify_)
    348       notify_->WakeupMainThread();
    349   }
    350 
    351   struct StateChangeData : public talk_base::MessageData {
    352     StateChangeData(buzz::XmppEngine::State state) : s_(state) {}
    353     buzz::XmppEngine::State s_;
    354   };
    355 
    356   void OnStateChange(buzz::XmppEngine::State state) {
    357     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    358     switch (state) {
    359     case buzz::XmppEngine::STATE_OPEN:
    360       ppt_ = new buzz::PresencePushTask(pump_.get()->client());
    361       ppt_->SignalStatusUpdate.connect(this,
    362                                        &LibjinglePlusWorker::OnStatusUpdate);
    363       ppt_->SignalStatusError.connect(this,
    364                                       &LibjinglePlusWorker::OnStatusError);
    365       ppt_->Start();
    366 
    367       rmt_ = new buzz::ReceiveMessageTask(pump_.get()->client(), buzz::XmppEngine::HL_ALL);
    368       rmt_->SignalIncomingMessage.connect(this, &LibjinglePlusWorker::OnIncomingMessage);
    369       rmt_->Start();
    370 
    371       rt_ = new buzz::RosterTask(pump_.get()->client());
    372       rt_->SignalRosterItemUpdated.connect(this, &LibjinglePlusWorker::OnRosterItemUpdated);
    373       rt_->SignalRosterItemRemoved.connect(this, &LibjinglePlusWorker::OnRosterItemRemoved);
    374       rt_->SignalSubscribe.connect(this, &LibjinglePlusWorker::OnRosterSubscribe);
    375       rt_->SignalUnsubscribe.connect(this, &LibjinglePlusWorker::OnRosterUnsubscribe);
    376       rt_->SignalSubscribed.connect(this, &LibjinglePlusWorker::OnRosterSubscribed);
    377       rt_->SignalUnsubscribed.connect(this, &LibjinglePlusWorker::OnRosterUnsubscribed);
    378       rt_->SignalRosterRefreshStarted.connect(this, &LibjinglePlusWorker::OnRosterRefreshStarted);
    379       rt_->SignalRosterRefreshFinished.connect(this, &LibjinglePlusWorker::OnRosterRefreshFinished);
    380       rt_->Start();
    381       rt_->RefreshRosterNow();
    382 
    383       break;
    384     }
    385     main_thread_->Post(this, MSG_STATE_CHANGE, new StateChangeData(state));
    386     if (notify_)
    387       notify_->WakeupMainThread();
    388   }
    389 
    390   void OnStateChangeW(buzz::XmppEngine::State state) {
    391     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    392     if (notify_)
    393       notify_->OnStateChange(state);
    394   }
    395 
    396   struct RosterItemData : public talk_base::MessageData {
    397     RosterItemData(const buzz::RosterItem &ri) : ri_(ri) {}
    398     buzz::RosterItem ri_;
    399   };
    400 
    401   void OnRosterItemUpdatedW(const buzz::RosterItem &ri) {
    402     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    403     if (notify_)
    404       notify_->OnRosterItemUpdated(ri);
    405   }
    406 
    407   void OnRosterItemUpdated(const buzz::RosterItem &ri, bool huh) {
    408     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    409     main_thread_->Post(this, MSG_ROSTER_ITEM_UPDATED, new RosterItemData(ri));
    410     if (notify_)
    411       notify_->WakeupMainThread();
    412   }
    413 
    414   void OnRosterItemRemovedW(const buzz::RosterItem &ri) {
    415     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    416     if (notify_)
    417       notify_->OnRosterItemRemoved(ri);
    418   }
    419 
    420   void OnRosterItemRemoved(const buzz::RosterItem &ri) {
    421     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    422     main_thread_->Post(this, MSG_ROSTER_ITEM_REMOVED, new RosterItemData(ri));
    423     if (notify_)
    424       notify_->WakeupMainThread();
    425   }
    426 
    427   struct JidData : public talk_base::MessageData {
    428     JidData(const buzz::Jid& jid) : jid_(jid) {}
    429     const buzz::Jid jid_;
    430   };
    431 
    432   void OnRosterSubscribeW(const buzz::Jid& jid) {
    433     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    434     if (notify_)
    435       notify_->OnRosterSubscribe(jid);
    436   }
    437 
    438   void OnRosterSubscribe(const buzz::Jid& jid) {
    439     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    440     main_thread_->Post(this, MSG_ROSTER_SUBSCRIBE, new JidData(jid));
    441     if (notify_)
    442       notify_->WakeupMainThread();
    443   }
    444 
    445   void OnRosterUnsubscribeW(const buzz::Jid &jid) {
    446     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    447     if (notify_)
    448       notify_->OnRosterUnsubscribe(jid);
    449   }
    450 
    451   void OnRosterUnsubscribe(const buzz::Jid &jid) {
    452     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    453     main_thread_->Post(this, MSG_ROSTER_UNSUBSCRIBE, new JidData(jid));
    454     if (notify_)
    455       notify_->WakeupMainThread();
    456   }
    457 
    458   void OnRosterSubscribedW(const buzz::Jid &jid) {
    459     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    460     if (notify_)
    461       notify_->OnRosterSubscribed(jid);
    462   }
    463 
    464   void OnRosterSubscribed(const buzz::Jid &jid) {
    465     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    466     main_thread_->Post(this, MSG_ROSTER_SUBSCRIBED, new JidData(jid));
    467     if (notify_)
    468       notify_->WakeupMainThread();
    469   }
    470 
    471   void OnRosterUnsubscribedW(const buzz::Jid &jid) {
    472     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    473     if (notify_)
    474       notify_->OnRosterUnsubscribed(jid);
    475   }
    476 
    477   void OnRosterUnsubscribed(const buzz::Jid &jid) {
    478     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    479     main_thread_->Post(this, MSG_ROSTER_UNSUBSCRIBED, new JidData(jid));
    480     if (notify_)
    481       notify_->WakeupMainThread();
    482   }
    483 
    484   void OnRosterRefreshStartedW() {
    485     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    486     if (notify_)
    487       notify_->OnRosterRefreshStarted();
    488   }
    489 
    490   void OnRosterRefreshStarted() {
    491     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    492     main_thread_->Post(this, MSG_ROSTER_REFRESH_STARTED);
    493     if (notify_)
    494       notify_->WakeupMainThread();
    495   }
    496 
    497   void OnRosterRefreshFinishedW() {
    498     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    499     if (notify_)
    500       notify_->OnRosterRefreshFinished();
    501   }
    502 
    503   void OnRosterRefreshFinished() {
    504     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    505     main_thread_->Post(this, MSG_ROSTER_REFRESH_FINISHED);
    506     if (notify_)
    507       notify_->WakeupMainThread();
    508   }
    509 
    510   struct XmppMessageData : talk_base::MessageData {
    511     XmppMessageData(const buzz::XmppMessage &m) : m_(m) {}
    512     buzz::XmppMessage m_;
    513   };
    514 
    515   void OnIncomingMessageW(const buzz::XmppMessage &msg) {
    516     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    517     if (notify_)
    518       notify_->OnMessage(msg);
    519   }
    520 
    521   void OnIncomingMessage(const buzz::XmppMessage &msg) {
    522     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    523     main_thread_->Post(this, MSG_INCOMING_MESSAGE, new XmppMessageData(msg));
    524     if (notify_)
    525       notify_->WakeupMainThread();
    526   }
    527 
    528   void OnStatusUpdateW (const buzz::Status &status) {
    529     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    530     if (notify_)
    531       notify_->OnStatusUpdate(status);
    532   }
    533 
    534   void OnStatusUpdate (const buzz::Status &status) {
    535     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    536     main_thread_->Post(this, MSG_STATUS_UPDATE, new SendPresenceData(status));
    537     if (notify_)
    538       notify_->WakeupMainThread();
    539   }
    540 
    541   struct StatusErrorData : talk_base::MessageData {
    542     StatusErrorData(const buzz::XmlElement &stanza) : stanza_(stanza) {}
    543     buzz::XmlElement stanza_;
    544   };
    545 
    546   void OnStatusErrorW (const buzz::XmlElement &stanza) {
    547     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    548     if (notify_)
    549       notify_->OnStatusError(stanza);
    550   }
    551 
    552   void OnStatusError (const buzz::XmlElement &stanza) {
    553     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    554     main_thread_->Post(this, MSG_STATUS_ERROR, new StatusErrorData(stanza));
    555     if (notify_)
    556       notify_->WakeupMainThread();
    557   }
    558 
    559   void LoginW() {
    560     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    561     XmppSocket* socket = new XmppSocket(true);
    562     pump_->DoLogin(xcs_, socket, is_test_login_ ?  NULL : new XmppAuth());
    563     socket->SignalCloseEvent.connect(this,
    564         &LibjinglePlusWorker::OnXmppSocketClose);
    565   }
    566 
    567   void DisconnectW() {
    568     assert(talk_base::ThreadManager::CurrentThread() == worker_thread_);
    569     pump_->DoDisconnect();
    570   }
    571 
    572   void SendXmppMessageW(const buzz::XmppMessage &m) {
    573     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    574     buzz::SendMessageTask * smt = new buzz::SendMessageTask(pump_.get()->client());
    575     smt->Send(m);
    576     smt->Start();
    577   }
    578 
    579   void SendXmppIqW(const buzz::Jid &to_jid, bool is_get,
    580                    const buzz::XmlElement *xml_element) {
    581     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    582     buzz::IqTask *iq_task = new buzz::IqTask(pump_.get()->client(),
    583         is_get, to_jid, const_cast<buzz::XmlElement *>(xml_element));
    584     iq_task->SignalDone.connect(this, &LibjinglePlusWorker::OnIqComplete);
    585     iq_task->Start();
    586   }
    587 
    588  struct IqCompleteData : public talk_base::MessageData {
    589    IqCompleteData(bool success, const buzz::XmlElement *stanza) :
    590      success_(success), stanza_(*stanza) {}
    591    bool success_;
    592    buzz::XmlElement stanza_;
    593  };
    594 
    595   void OnIqCompleteW(bool success, const buzz::XmlElement& stanza) {
    596     assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
    597     if (notify_)
    598       notify_->OnIqDone(success, stanza);
    599   }
    600 
    601   void OnIqComplete(bool success, const buzz::XmlElement *stanza) {
    602     assert(talk_base::ThreadManager::CurrentThread() == worker_thread_);
    603     main_thread_->Post(this, MSG_IQ_COMPLETE,
    604        new IqCompleteData(success, stanza));
    605     if (notify_)
    606       notify_->WakeupMainThread();
    607   }
    608 
    609   void SendPresenceW(const buzz::Status & s) {
    610     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    611     buzz::PresenceOutTask *pot = new buzz::PresenceOutTask(pump_.get()->client());
    612     pot->Send(s);
    613     pot->Start();
    614   }
    615 
    616 
    617   void SendDirectedMUCPresenceW(const buzz::Jid & j, const buzz::Status & s,
    618 			       const std::string &user_nick, const std::string &api_capability,
    619 			       const std::string &api_message, const std::string &role) {
    620     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    621     buzz::PresenceOutTask *pot = new buzz::PresenceOutTask(pump_.get()->client());
    622     pot->SendDirectedMUC(j,s,user_nick,api_capability,api_message, role);
    623     pot->Start();
    624   }
    625 
    626   void SendDirectedPresenceW(const buzz::Jid & j, const buzz::Status & s) {
    627     assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
    628     buzz::PresenceOutTask *pot = new buzz::PresenceOutTask(pump_.get()->client());
    629     pot->SendDirected(j,s);
    630     pot->Start();
    631   }
    632 
    633   void OnXmppSocketClose(int error) {
    634     notify_->OnSocketClose(error);
    635   }
    636 
    637  struct SendMessageData : public talk_base::MessageData {
    638    SendMessageData(const buzz::XmppMessage &m) : m_(m) {}
    639    buzz::XmppMessage m_;
    640   };
    641 
    642  struct SendIqData : public talk_base::MessageData {
    643    SendIqData(const buzz::Jid &jid, bool is_get, const buzz::XmlElement *m)
    644      : to_jid_(jid), is_get_(is_get), xml_element_(m) {}
    645    buzz::Jid to_jid_;
    646    bool is_get_;
    647    const buzz::XmlElement *xml_element_;
    648   };
    649 
    650  struct SendPresenceData : public talk_base::MessageData {
    651    SendPresenceData(const buzz::Status &s) : s_(s) {}
    652    buzz::Status s_;
    653   };
    654 
    655  struct SendDirectedPresenceData : public talk_base::MessageData {
    656    SendDirectedPresenceData(const buzz::Jid &j, const buzz::Status &s) : j_(j), s_(s) {}
    657    buzz::Jid j_;
    658    buzz::Status s_;
    659  };
    660 
    661   struct SendDirectedMUCPresenceData : public talk_base::MessageData {
    662     SendDirectedMUCPresenceData(const buzz::Jid &j, const buzz::Status &s,
    663 				const std::string &un, const std::string &ac,
    664 				const std::string &am, const std::string &role)
    665       : j_(j), s_(s), un_(un), ac_(ac), am_(am), role_(role) {}
    666     buzz::Jid j_;
    667     buzz::Status s_;
    668     std::string un_;
    669     std::string ac_;
    670     std::string am_;
    671     std::string role_;
    672   };
    673 
    674   talk_base::scoped_ptr<talk_base::Win32SocketServer> ss_;
    675   talk_base::scoped_ptr<talk_base::Thread> main_thread_;
    676   talk_base::Thread *worker_thread_;
    677 
    678   LibjinglePlus *ljp_;
    679   LibjinglePlusNotify *notify_;
    680   buzz::XmppClientSettings xcs_;
    681   talk_base::PhysicalSocketServer pss_;
    682 
    683   talk_base::scoped_ptr<XmppPump> pump_;
    684   buzz::PresencePushTask * ppt_;
    685   buzz::ReceiveMessageTask * rmt_;
    686   buzz::RosterTask * rt_;
    687 
    688   bool is_test_login_;
    689 };
    690 
    691 LibjinglePlus::LibjinglePlus(LibjinglePlusNotify *notify)
    692 {
    693   worker_ = new LibjinglePlusWorker(this, notify);
    694 }
    695 
    696 LibjinglePlus::~LibjinglePlus()
    697 {
    698  delete worker_;
    699   worker_ = NULL;
    700 }
    701 
    702 void LibjinglePlus::Login(const std::string &jid,
    703 		          const std::string &password,
    704 		          const std::string &machine_address,
    705 			  bool is_test, bool cookie_auth) {
    706   worker_->Login(jid, password, machine_address, is_test, cookie_auth);
    707 }
    708 
    709 void LibjinglePlus::SendPresence(const buzz::Status & s) {
    710   worker_->SendPresence(s);
    711 }
    712 
    713 void LibjinglePlus::SendDirectedPresence(const buzz::Jid & j, const buzz::Status & s) {
    714   worker_->SendDirectedPresence(j,s);
    715 }
    716 
    717 void LibjinglePlus::SendDirectedMUCPresence(const buzz::Jid & j,
    718     const buzz::Status & s, const std::string &user_nick,
    719     const std::string &api_capability, const std::string &api_message,
    720     const std::string &role) {
    721   worker_->SendDirectedMUCPresence(j,s,user_nick,api_capability,api_message,
    722       role);
    723 }
    724 
    725 void LibjinglePlus::SendXmppMessage(const buzz::XmppMessage & m) {
    726   worker_->SendXmppMessage(m);
    727 }
    728 
    729 void LibjinglePlus::SendXmppIq(const buzz::Jid &to_jid, bool is_get,
    730                                const buzz::XmlElement *iq_element) {
    731   worker_->SendXmppIq(to_jid, is_get, iq_element);
    732 }
    733 
    734 void LibjinglePlus::DoCallbacks() {
    735   worker_->DoCallbacks();
    736 }
    737