Home | History | Annotate | Download | only in dbus
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "dbus/bus.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/logging.h"
      9 #include "base/message_loop/message_loop.h"
     10 #include "base/message_loop/message_loop_proxy.h"
     11 #include "base/stl_util.h"
     12 #include "base/strings/stringprintf.h"
     13 #include "base/threading/thread.h"
     14 #include "base/threading/thread_restrictions.h"
     15 #include "base/time/time.h"
     16 #include "dbus/exported_object.h"
     17 #include "dbus/message.h"
     18 #include "dbus/object_manager.h"
     19 #include "dbus/object_path.h"
     20 #include "dbus/object_proxy.h"
     21 #include "dbus/scoped_dbus_error.h"
     22 
     23 namespace dbus {
     24 
     25 namespace {
     26 
     27 const char kDisconnectedSignal[] = "Disconnected";
     28 const char kDisconnectedMatchRule[] =
     29     "type='signal', path='/org/freedesktop/DBus/Local',"
     30     "interface='org.freedesktop.DBus.Local', member='Disconnected'";
     31 
     32 // The NameOwnerChanged member in org.freedesktop.DBus
     33 const char kNameOwnerChangedSignal[] = "NameOwnerChanged";
     34 
     35 // The match rule used to filter for changes to a given service name owner.
     36 const char kServiceNameOwnerChangeMatchRule[] =
     37     "type='signal',interface='org.freedesktop.DBus',"
     38     "member='NameOwnerChanged',path='/org/freedesktop/DBus',"
     39     "sender='org.freedesktop.DBus',arg0='%s'";
     40 
     41 // The class is used for watching the file descriptor used for D-Bus
     42 // communication.
     43 class Watch : public base::MessagePumpLibevent::Watcher {
     44  public:
     45   explicit Watch(DBusWatch* watch)
     46       : raw_watch_(watch) {
     47     dbus_watch_set_data(raw_watch_, this, NULL);
     48   }
     49 
     50   virtual ~Watch() {
     51     dbus_watch_set_data(raw_watch_, NULL, NULL);
     52   }
     53 
     54   // Returns true if the underlying file descriptor is ready to be watched.
     55   bool IsReadyToBeWatched() {
     56     return dbus_watch_get_enabled(raw_watch_);
     57   }
     58 
     59   // Starts watching the underlying file descriptor.
     60   void StartWatching() {
     61     const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
     62     const int flags = dbus_watch_get_flags(raw_watch_);
     63 
     64     base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ;
     65     if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
     66       mode = base::MessageLoopForIO::WATCH_READ_WRITE;
     67     else if (flags & DBUS_WATCH_READABLE)
     68       mode = base::MessageLoopForIO::WATCH_READ;
     69     else if (flags & DBUS_WATCH_WRITABLE)
     70       mode = base::MessageLoopForIO::WATCH_WRITE;
     71     else
     72       NOTREACHED();
     73 
     74     const bool persistent = true;  // Watch persistently.
     75     const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor(
     76         file_descriptor, persistent, mode, &file_descriptor_watcher_, this);
     77     CHECK(success) << "Unable to allocate memory";
     78   }
     79 
     80   // Stops watching the underlying file descriptor.
     81   void StopWatching() {
     82     file_descriptor_watcher_.StopWatchingFileDescriptor();
     83   }
     84 
     85  private:
     86   // Implement MessagePumpLibevent::Watcher.
     87   virtual void OnFileCanReadWithoutBlocking(int file_descriptor) OVERRIDE {
     88     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
     89     CHECK(success) << "Unable to allocate memory";
     90   }
     91 
     92   // Implement MessagePumpLibevent::Watcher.
     93   virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) OVERRIDE {
     94     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
     95     CHECK(success) << "Unable to allocate memory";
     96   }
     97 
     98   DBusWatch* raw_watch_;
     99   base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
    100 };
    101 
    102 // The class is used for monitoring the timeout used for D-Bus method
    103 // calls.
    104 //
    105 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
    106 // the object is is alive when HandleTimeout() is called. It's unlikely
    107 // but it may be possible that HandleTimeout() is called after
    108 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
    109 // Bus::OnRemoveTimeout().
    110 class Timeout : public base::RefCountedThreadSafe<Timeout> {
    111  public:
    112   explicit Timeout(DBusTimeout* timeout)
    113       : raw_timeout_(timeout),
    114         monitoring_is_active_(false),
    115         is_completed(false) {
    116     dbus_timeout_set_data(raw_timeout_, this, NULL);
    117     AddRef();  // Balanced on Complete().
    118   }
    119 
    120   // Returns true if the timeout is ready to be monitored.
    121   bool IsReadyToBeMonitored() {
    122     return dbus_timeout_get_enabled(raw_timeout_);
    123   }
    124 
    125   // Starts monitoring the timeout.
    126   void StartMonitoring(Bus* bus) {
    127     bus->GetDBusTaskRunner()->PostDelayedTask(
    128         FROM_HERE,
    129         base::Bind(&Timeout::HandleTimeout, this),
    130         GetInterval());
    131     monitoring_is_active_ = true;
    132   }
    133 
    134   // Stops monitoring the timeout.
    135   void StopMonitoring() {
    136     // We cannot take back the delayed task we posted in
    137     // StartMonitoring(), so we just mark the monitoring is inactive now.
    138     monitoring_is_active_ = false;
    139   }
    140 
    141   // Returns the interval.
    142   base::TimeDelta GetInterval() {
    143     return base::TimeDelta::FromMilliseconds(
    144         dbus_timeout_get_interval(raw_timeout_));
    145   }
    146 
    147   // Cleans up the raw_timeout and marks that timeout is completed.
    148   // See the class comment above for why we are doing this.
    149   void Complete() {
    150     dbus_timeout_set_data(raw_timeout_, NULL, NULL);
    151     is_completed = true;
    152     Release();
    153   }
    154 
    155  private:
    156   friend class base::RefCountedThreadSafe<Timeout>;
    157   ~Timeout() {
    158   }
    159 
    160   // Handles the timeout.
    161   void HandleTimeout() {
    162     // If the timeout is marked completed, we should do nothing. This can
    163     // occur if this function is called after Bus::OnRemoveTimeout().
    164     if (is_completed)
    165       return;
    166     // Skip if monitoring is canceled.
    167     if (!monitoring_is_active_)
    168       return;
    169 
    170     const bool success = dbus_timeout_handle(raw_timeout_);
    171     CHECK(success) << "Unable to allocate memory";
    172   }
    173 
    174   DBusTimeout* raw_timeout_;
    175   bool monitoring_is_active_;
    176   bool is_completed;
    177 };
    178 
    179 }  // namespace
    180 
    181 Bus::Options::Options()
    182   : bus_type(SESSION),
    183     connection_type(PRIVATE) {
    184 }
    185 
    186 Bus::Options::~Options() {
    187 }
    188 
    189 Bus::Bus(const Options& options)
    190     : bus_type_(options.bus_type),
    191       connection_type_(options.connection_type),
    192       dbus_task_runner_(options.dbus_task_runner),
    193       on_shutdown_(false /* manual_reset */, false /* initially_signaled */),
    194       connection_(NULL),
    195       origin_thread_id_(base::PlatformThread::CurrentId()),
    196       async_operations_set_up_(false),
    197       shutdown_completed_(false),
    198       num_pending_watches_(0),
    199       num_pending_timeouts_(0),
    200       address_(options.address),
    201       on_disconnected_closure_(options.disconnected_callback) {
    202   // This is safe to call multiple times.
    203   dbus_threads_init_default();
    204   // The origin message loop is unnecessary if the client uses synchronous
    205   // functions only.
    206   if (base::MessageLoop::current())
    207     origin_task_runner_ = base::MessageLoop::current()->message_loop_proxy();
    208 }
    209 
    210 Bus::~Bus() {
    211   DCHECK(!connection_);
    212   DCHECK(owned_service_names_.empty());
    213   DCHECK(match_rules_added_.empty());
    214   DCHECK(filter_functions_added_.empty());
    215   DCHECK(registered_object_paths_.empty());
    216   DCHECK_EQ(0, num_pending_watches_);
    217   // TODO(satorux): This check fails occasionally in browser_tests for tests
    218   // that run very quickly. Perhaps something does not have time to clean up.
    219   // Despite the check failing, the tests seem to run fine. crosbug.com/23416
    220   // DCHECK_EQ(0, num_pending_timeouts_);
    221 }
    222 
    223 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
    224                                  const ObjectPath& object_path) {
    225   return GetObjectProxyWithOptions(service_name, object_path,
    226                                    ObjectProxy::DEFAULT_OPTIONS);
    227 }
    228 
    229 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
    230                                             const ObjectPath& object_path,
    231                                             int options) {
    232   AssertOnOriginThread();
    233 
    234   // Check if we already have the requested object proxy.
    235   const ObjectProxyTable::key_type key(service_name + object_path.value(),
    236                                        options);
    237   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
    238   if (iter != object_proxy_table_.end()) {
    239     return iter->second.get();
    240   }
    241 
    242   scoped_refptr<ObjectProxy> object_proxy =
    243       new ObjectProxy(this, service_name, object_path, options);
    244   object_proxy_table_[key] = object_proxy;
    245 
    246   return object_proxy.get();
    247 }
    248 
    249 bool Bus::RemoveObjectProxy(const std::string& service_name,
    250                             const ObjectPath& object_path,
    251                             const base::Closure& callback) {
    252   return RemoveObjectProxyWithOptions(service_name, object_path,
    253                                       ObjectProxy::DEFAULT_OPTIONS,
    254                                       callback);
    255 }
    256 
    257 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
    258                                        const ObjectPath& object_path,
    259                                        int options,
    260                                        const base::Closure& callback) {
    261   AssertOnOriginThread();
    262 
    263   // Check if we have the requested object proxy.
    264   const ObjectProxyTable::key_type key(service_name + object_path.value(),
    265                                        options);
    266   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
    267   if (iter != object_proxy_table_.end()) {
    268     scoped_refptr<ObjectProxy> object_proxy = iter->second;
    269     object_proxy_table_.erase(iter);
    270     // Object is present. Remove it now and Detach in the DBus thread.
    271     GetDBusTaskRunner()->PostTask(
    272         FROM_HERE,
    273         base::Bind(&Bus::RemoveObjectProxyInternal,
    274                    this, object_proxy, callback));
    275     return true;
    276   }
    277   return false;
    278 }
    279 
    280 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy,
    281                                     const base::Closure& callback) {
    282   AssertOnDBusThread();
    283 
    284   object_proxy.get()->Detach();
    285 
    286   GetOriginTaskRunner()->PostTask(FROM_HERE, callback);
    287 }
    288 
    289 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
    290   AssertOnOriginThread();
    291 
    292   // Check if we already have the requested exported object.
    293   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
    294   if (iter != exported_object_table_.end()) {
    295     return iter->second.get();
    296   }
    297 
    298   scoped_refptr<ExportedObject> exported_object =
    299       new ExportedObject(this, object_path);
    300   exported_object_table_[object_path] = exported_object;
    301 
    302   return exported_object.get();
    303 }
    304 
    305 void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
    306   AssertOnOriginThread();
    307 
    308   // Remove the registered object from the table first, to allow a new
    309   // GetExportedObject() call to return a new object, rather than this one.
    310   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
    311   if (iter == exported_object_table_.end())
    312     return;
    313 
    314   scoped_refptr<ExportedObject> exported_object = iter->second;
    315   exported_object_table_.erase(iter);
    316 
    317   // Post the task to perform the final unregistration to the D-Bus thread.
    318   // Since the registration also happens on the D-Bus thread in
    319   // TryRegisterObjectPath(), and the task runner we post to is a
    320   // SequencedTaskRunner, there is a guarantee that this will happen before any
    321   // future registration call.
    322   GetDBusTaskRunner()->PostTask(
    323       FROM_HERE,
    324       base::Bind(&Bus::UnregisterExportedObjectInternal,
    325                  this, exported_object));
    326 }
    327 
    328 void Bus::UnregisterExportedObjectInternal(
    329     scoped_refptr<ExportedObject> exported_object) {
    330   AssertOnDBusThread();
    331 
    332   exported_object->Unregister();
    333 }
    334 
    335 ObjectManager* Bus::GetObjectManager(const std::string& service_name,
    336                                      const ObjectPath& object_path) {
    337   AssertOnOriginThread();
    338 
    339   // Check if we already have the requested object manager.
    340   const ObjectManagerTable::key_type key(service_name + object_path.value());
    341   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
    342   if (iter != object_manager_table_.end()) {
    343     return iter->second.get();
    344   }
    345 
    346   scoped_refptr<ObjectManager> object_manager =
    347       new ObjectManager(this, service_name, object_path);
    348   object_manager_table_[key] = object_manager;
    349 
    350   return object_manager.get();
    351 }
    352 
    353 void Bus::RemoveObjectManager(const std::string& service_name,
    354                               const ObjectPath& object_path) {
    355   AssertOnOriginThread();
    356 
    357   const ObjectManagerTable::key_type key(service_name + object_path.value());
    358   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
    359   if (iter == object_manager_table_.end())
    360     return;
    361 
    362   scoped_refptr<ObjectManager> object_manager = iter->second;
    363   object_manager_table_.erase(iter);
    364 }
    365 
    366 void Bus::GetManagedObjects() {
    367   for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
    368        iter != object_manager_table_.end(); ++iter) {
    369     iter->second->GetManagedObjects();
    370   }
    371 }
    372 
    373 bool Bus::Connect() {
    374   // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
    375   AssertOnDBusThread();
    376 
    377   // Check if it's already initialized.
    378   if (connection_)
    379     return true;
    380 
    381   ScopedDBusError error;
    382   if (bus_type_ == CUSTOM_ADDRESS) {
    383     if (connection_type_ == PRIVATE) {
    384       connection_ = dbus_connection_open_private(address_.c_str(), error.get());
    385     } else {
    386       connection_ = dbus_connection_open(address_.c_str(), error.get());
    387     }
    388   } else {
    389     const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
    390     if (connection_type_ == PRIVATE) {
    391       connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
    392     } else {
    393       connection_ = dbus_bus_get(dbus_bus_type, error.get());
    394     }
    395   }
    396   if (!connection_) {
    397     LOG(ERROR) << "Failed to connect to the bus: "
    398                << (error.is_set() ? error.message() : "");
    399     return false;
    400   }
    401 
    402   if (bus_type_ == CUSTOM_ADDRESS) {
    403     // We should call dbus_bus_register here, otherwise unique name can not be
    404     // acquired. According to dbus specification, it is responsible to call
    405     // org.freedesktop.DBus.Hello method at the beging of bus connection to
    406     // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
    407     // called internally.
    408     if (!dbus_bus_register(connection_, error.get())) {
    409       LOG(ERROR) << "Failed to register the bus component: "
    410                  << (error.is_set() ? error.message() : "");
    411       return false;
    412     }
    413   }
    414   // We shouldn't exit on the disconnected signal.
    415   dbus_connection_set_exit_on_disconnect(connection_, false);
    416 
    417   // Watch Disconnected signal.
    418   AddFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
    419   AddMatch(kDisconnectedMatchRule, error.get());
    420 
    421   return true;
    422 }
    423 
    424 void Bus::ClosePrivateConnection() {
    425   // dbus_connection_close is blocking call.
    426   AssertOnDBusThread();
    427   DCHECK_EQ(PRIVATE, connection_type_)
    428       << "non-private connection should not be closed";
    429   dbus_connection_close(connection_);
    430 }
    431 
    432 void Bus::ShutdownAndBlock() {
    433   AssertOnDBusThread();
    434 
    435   if (shutdown_completed_)
    436     return;  // Already shutdowned, just return.
    437 
    438   // Unregister the exported objects.
    439   for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
    440        iter != exported_object_table_.end(); ++iter) {
    441     iter->second->Unregister();
    442   }
    443 
    444   // Release all service names.
    445   for (std::set<std::string>::iterator iter = owned_service_names_.begin();
    446        iter != owned_service_names_.end();) {
    447     // This is a bit tricky but we should increment the iter here as
    448     // ReleaseOwnership() may remove |service_name| from the set.
    449     const std::string& service_name = *iter++;
    450     ReleaseOwnership(service_name);
    451   }
    452   if (!owned_service_names_.empty()) {
    453     LOG(ERROR) << "Failed to release all service names. # of services left: "
    454                << owned_service_names_.size();
    455   }
    456 
    457   // Detach from the remote objects.
    458   for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
    459        iter != object_proxy_table_.end(); ++iter) {
    460     iter->second->Detach();
    461   }
    462 
    463   // Release object proxies and exported objects here. We should do this
    464   // here rather than in the destructor to avoid memory leaks due to
    465   // cyclic references.
    466   object_proxy_table_.clear();
    467   exported_object_table_.clear();
    468 
    469   // Private connection should be closed.
    470   if (connection_) {
    471     // Remove Disconnected watcher.
    472     ScopedDBusError error;
    473     RemoveFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
    474     RemoveMatch(kDisconnectedMatchRule, error.get());
    475 
    476     if (connection_type_ == PRIVATE)
    477       ClosePrivateConnection();
    478     // dbus_connection_close() won't unref.
    479     dbus_connection_unref(connection_);
    480   }
    481 
    482   connection_ = NULL;
    483   shutdown_completed_ = true;
    484 }
    485 
    486 void Bus::ShutdownOnDBusThreadAndBlock() {
    487   AssertOnOriginThread();
    488   DCHECK(dbus_task_runner_.get());
    489 
    490   GetDBusTaskRunner()->PostTask(
    491       FROM_HERE,
    492       base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this));
    493 
    494   // http://crbug.com/125222
    495   base::ThreadRestrictions::ScopedAllowWait allow_wait;
    496 
    497   // Wait until the shutdown is complete on the D-Bus thread.
    498   // The shutdown should not hang, but set timeout just in case.
    499   const int kTimeoutSecs = 3;
    500   const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
    501   const bool signaled = on_shutdown_.TimedWait(timeout);
    502   LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
    503 }
    504 
    505 void Bus::RequestOwnership(const std::string& service_name,
    506                            ServiceOwnershipOptions options,
    507                            OnOwnershipCallback on_ownership_callback) {
    508   AssertOnOriginThread();
    509 
    510   GetDBusTaskRunner()->PostTask(
    511       FROM_HERE,
    512       base::Bind(&Bus::RequestOwnershipInternal,
    513                  this, service_name, options, on_ownership_callback));
    514 }
    515 
    516 void Bus::RequestOwnershipInternal(const std::string& service_name,
    517                                    ServiceOwnershipOptions options,
    518                                    OnOwnershipCallback on_ownership_callback) {
    519   AssertOnDBusThread();
    520 
    521   bool success = Connect();
    522   if (success)
    523     success = RequestOwnershipAndBlock(service_name, options);
    524 
    525   GetOriginTaskRunner()->PostTask(FROM_HERE,
    526                                   base::Bind(on_ownership_callback,
    527                                              service_name,
    528                                              success));
    529 }
    530 
    531 bool Bus::RequestOwnershipAndBlock(const std::string& service_name,
    532                                    ServiceOwnershipOptions options) {
    533   DCHECK(connection_);
    534   // dbus_bus_request_name() is a blocking call.
    535   AssertOnDBusThread();
    536 
    537   // Check if we already own the service name.
    538   if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
    539     return true;
    540   }
    541 
    542   ScopedDBusError error;
    543   const int result = dbus_bus_request_name(connection_,
    544                                            service_name.c_str(),
    545                                            options,
    546                                            error.get());
    547   if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
    548     LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
    549                << (error.is_set() ? error.message() : "");
    550     return false;
    551   }
    552   owned_service_names_.insert(service_name);
    553   return true;
    554 }
    555 
    556 bool Bus::ReleaseOwnership(const std::string& service_name) {
    557   DCHECK(connection_);
    558   // dbus_bus_request_name() is a blocking call.
    559   AssertOnDBusThread();
    560 
    561   // Check if we already own the service name.
    562   std::set<std::string>::iterator found =
    563       owned_service_names_.find(service_name);
    564   if (found == owned_service_names_.end()) {
    565     LOG(ERROR) << service_name << " is not owned by the bus";
    566     return false;
    567   }
    568 
    569   ScopedDBusError error;
    570   const int result = dbus_bus_release_name(connection_, service_name.c_str(),
    571                                            error.get());
    572   if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
    573     owned_service_names_.erase(found);
    574     return true;
    575   } else {
    576     LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
    577                << (error.is_set() ? error.message() : "")
    578                << ", result code: " << result;
    579     return false;
    580   }
    581 }
    582 
    583 bool Bus::SetUpAsyncOperations() {
    584   DCHECK(connection_);
    585   AssertOnDBusThread();
    586 
    587   if (async_operations_set_up_)
    588     return true;
    589 
    590   // Process all the incoming data if any, so that OnDispatchStatus() will
    591   // be called when the incoming data is ready.
    592   ProcessAllIncomingDataIfAny();
    593 
    594   bool success = dbus_connection_set_watch_functions(connection_,
    595                                                      &Bus::OnAddWatchThunk,
    596                                                      &Bus::OnRemoveWatchThunk,
    597                                                      &Bus::OnToggleWatchThunk,
    598                                                      this,
    599                                                      NULL);
    600   CHECK(success) << "Unable to allocate memory";
    601 
    602   success = dbus_connection_set_timeout_functions(connection_,
    603                                                   &Bus::OnAddTimeoutThunk,
    604                                                   &Bus::OnRemoveTimeoutThunk,
    605                                                   &Bus::OnToggleTimeoutThunk,
    606                                                   this,
    607                                                   NULL);
    608   CHECK(success) << "Unable to allocate memory";
    609 
    610   dbus_connection_set_dispatch_status_function(
    611       connection_,
    612       &Bus::OnDispatchStatusChangedThunk,
    613       this,
    614       NULL);
    615 
    616   async_operations_set_up_ = true;
    617 
    618   return true;
    619 }
    620 
    621 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
    622                                         int timeout_ms,
    623                                         DBusError* error) {
    624   DCHECK(connection_);
    625   AssertOnDBusThread();
    626 
    627   return dbus_connection_send_with_reply_and_block(
    628       connection_, request, timeout_ms, error);
    629 }
    630 
    631 void Bus::SendWithReply(DBusMessage* request,
    632                         DBusPendingCall** pending_call,
    633                         int timeout_ms) {
    634   DCHECK(connection_);
    635   AssertOnDBusThread();
    636 
    637   const bool success = dbus_connection_send_with_reply(
    638       connection_, request, pending_call, timeout_ms);
    639   CHECK(success) << "Unable to allocate memory";
    640 }
    641 
    642 void Bus::Send(DBusMessage* request, uint32* serial) {
    643   DCHECK(connection_);
    644   AssertOnDBusThread();
    645 
    646   const bool success = dbus_connection_send(connection_, request, serial);
    647   CHECK(success) << "Unable to allocate memory";
    648 }
    649 
    650 bool Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
    651                             void* user_data) {
    652   DCHECK(connection_);
    653   AssertOnDBusThread();
    654 
    655   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
    656       std::make_pair(filter_function, user_data);
    657   if (filter_functions_added_.find(filter_data_pair) !=
    658       filter_functions_added_.end()) {
    659     VLOG(1) << "Filter function already exists: " << filter_function
    660             << " with associated data: " << user_data;
    661     return false;
    662   }
    663 
    664   const bool success = dbus_connection_add_filter(
    665       connection_, filter_function, user_data, NULL);
    666   CHECK(success) << "Unable to allocate memory";
    667   filter_functions_added_.insert(filter_data_pair);
    668   return true;
    669 }
    670 
    671 bool Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
    672                                void* user_data) {
    673   DCHECK(connection_);
    674   AssertOnDBusThread();
    675 
    676   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
    677       std::make_pair(filter_function, user_data);
    678   if (filter_functions_added_.find(filter_data_pair) ==
    679       filter_functions_added_.end()) {
    680     VLOG(1) << "Requested to remove an unknown filter function: "
    681             << filter_function
    682             << " with associated data: " << user_data;
    683     return false;
    684   }
    685 
    686   dbus_connection_remove_filter(connection_, filter_function, user_data);
    687   filter_functions_added_.erase(filter_data_pair);
    688   return true;
    689 }
    690 
    691 void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
    692   DCHECK(connection_);
    693   AssertOnDBusThread();
    694 
    695   std::map<std::string, int>::iterator iter =
    696       match_rules_added_.find(match_rule);
    697   if (iter != match_rules_added_.end()) {
    698     // The already existing rule's counter is incremented.
    699     iter->second++;
    700 
    701     VLOG(1) << "Match rule already exists: " << match_rule;
    702     return;
    703   }
    704 
    705   dbus_bus_add_match(connection_, match_rule.c_str(), error);
    706   match_rules_added_[match_rule] = 1;
    707 }
    708 
    709 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
    710   DCHECK(connection_);
    711   AssertOnDBusThread();
    712 
    713   std::map<std::string, int>::iterator iter =
    714       match_rules_added_.find(match_rule);
    715   if (iter == match_rules_added_.end()) {
    716     LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
    717     return false;
    718   }
    719 
    720   // The rule's counter is decremented and the rule is deleted when reachs 0.
    721   iter->second--;
    722   if (iter->second == 0) {
    723     dbus_bus_remove_match(connection_, match_rule.c_str(), error);
    724     match_rules_added_.erase(match_rule);
    725   }
    726   return true;
    727 }
    728 
    729 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
    730                                 const DBusObjectPathVTable* vtable,
    731                                 void* user_data,
    732                                 DBusError* error) {
    733   DCHECK(connection_);
    734   AssertOnDBusThread();
    735 
    736   if (registered_object_paths_.find(object_path) !=
    737       registered_object_paths_.end()) {
    738     LOG(ERROR) << "Object path already registered: " << object_path.value();
    739     return false;
    740   }
    741 
    742   const bool success = dbus_connection_try_register_object_path(
    743       connection_,
    744       object_path.value().c_str(),
    745       vtable,
    746       user_data,
    747       error);
    748   if (success)
    749     registered_object_paths_.insert(object_path);
    750   return success;
    751 }
    752 
    753 void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
    754   DCHECK(connection_);
    755   AssertOnDBusThread();
    756 
    757   if (registered_object_paths_.find(object_path) ==
    758       registered_object_paths_.end()) {
    759     LOG(ERROR) << "Requested to unregister an unknown object path: "
    760                << object_path.value();
    761     return;
    762   }
    763 
    764   const bool success = dbus_connection_unregister_object_path(
    765       connection_,
    766       object_path.value().c_str());
    767   CHECK(success) << "Unable to allocate memory";
    768   registered_object_paths_.erase(object_path);
    769 }
    770 
    771 void Bus::ShutdownOnDBusThreadAndBlockInternal() {
    772   AssertOnDBusThread();
    773 
    774   ShutdownAndBlock();
    775   on_shutdown_.Signal();
    776 }
    777 
    778 void Bus::ProcessAllIncomingDataIfAny() {
    779   AssertOnDBusThread();
    780 
    781   // As mentioned at the class comment in .h file, connection_ can be NULL.
    782   if (!connection_)
    783     return;
    784 
    785   // It is safe and necessary to call dbus_connection_get_dispatch_status even
    786   // if the connection is lost. Otherwise we will miss "Disconnected" signal.
    787   // (crbug.com/174431)
    788   if (dbus_connection_get_dispatch_status(connection_) ==
    789       DBUS_DISPATCH_DATA_REMAINS) {
    790     while (dbus_connection_dispatch(connection_) ==
    791            DBUS_DISPATCH_DATA_REMAINS) {
    792     }
    793   }
    794 }
    795 
    796 base::TaskRunner* Bus::GetDBusTaskRunner() {
    797   if (dbus_task_runner_.get())
    798     return dbus_task_runner_.get();
    799   else
    800     return GetOriginTaskRunner();
    801 }
    802 
    803 base::TaskRunner* Bus::GetOriginTaskRunner() {
    804   DCHECK(origin_task_runner_.get());
    805   return origin_task_runner_.get();
    806 }
    807 
    808 bool Bus::HasDBusThread() {
    809   return dbus_task_runner_.get() != NULL;
    810 }
    811 
    812 void Bus::AssertOnOriginThread() {
    813   DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
    814 }
    815 
    816 void Bus::AssertOnDBusThread() {
    817   base::ThreadRestrictions::AssertIOAllowed();
    818 
    819   if (dbus_task_runner_.get()) {
    820     DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread());
    821   } else {
    822     AssertOnOriginThread();
    823   }
    824 }
    825 
    826 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name,
    827                                          GetServiceOwnerOption options) {
    828   AssertOnDBusThread();
    829 
    830   MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner");
    831   MessageWriter writer(&get_name_owner_call);
    832   writer.AppendString(service_name);
    833   VLOG(1) << "Method call: " << get_name_owner_call.ToString();
    834 
    835   const ObjectPath obj_path("/org/freedesktop/DBus");
    836   if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") ||
    837       !get_name_owner_call.SetPath(obj_path)) {
    838     if (options == REPORT_ERRORS)
    839       LOG(ERROR) << "Failed to get name owner.";
    840     return "";
    841   }
    842 
    843   ScopedDBusError error;
    844   DBusMessage* response_message =
    845       SendWithReplyAndBlock(get_name_owner_call.raw_message(),
    846                             ObjectProxy::TIMEOUT_USE_DEFAULT,
    847                             error.get());
    848   if (!response_message) {
    849     if (options == REPORT_ERRORS) {
    850       LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": "
    851                  << error.message();
    852     }
    853     return "";
    854   }
    855 
    856   scoped_ptr<Response> response(Response::FromRawMessage(response_message));
    857   MessageReader reader(response.get());
    858 
    859   std::string service_owner;
    860   if (!reader.PopString(&service_owner))
    861     service_owner.clear();
    862   return service_owner;
    863 }
    864 
    865 void Bus::GetServiceOwner(const std::string& service_name,
    866                           const GetServiceOwnerCallback& callback) {
    867   AssertOnOriginThread();
    868 
    869   GetDBusTaskRunner()->PostTask(
    870       FROM_HERE,
    871       base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback));
    872 }
    873 
    874 void Bus::GetServiceOwnerInternal(const std::string& service_name,
    875                                   const GetServiceOwnerCallback& callback) {
    876   AssertOnDBusThread();
    877 
    878   std::string service_owner;
    879   if (Connect())
    880     service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS);
    881   GetOriginTaskRunner()->PostTask(FROM_HERE,
    882                                   base::Bind(callback, service_owner));
    883 }
    884 
    885 void Bus::ListenForServiceOwnerChange(
    886     const std::string& service_name,
    887     const GetServiceOwnerCallback& callback) {
    888   AssertOnOriginThread();
    889   DCHECK(!service_name.empty());
    890   DCHECK(!callback.is_null());
    891 
    892   GetDBusTaskRunner()->PostTask(
    893       FROM_HERE,
    894       base::Bind(&Bus::ListenForServiceOwnerChangeInternal,
    895                  this, service_name, callback));
    896 }
    897 
    898 void Bus::ListenForServiceOwnerChangeInternal(
    899     const std::string& service_name,
    900     const GetServiceOwnerCallback& callback) {
    901   AssertOnDBusThread();
    902   DCHECK(!service_name.empty());
    903   DCHECK(!callback.is_null());
    904 
    905   if (!Connect() || !SetUpAsyncOperations())
    906     return;
    907 
    908   if (service_owner_changed_listener_map_.empty()) {
    909     bool filter_added =
    910         AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
    911     DCHECK(filter_added);
    912   }
    913 
    914   ServiceOwnerChangedListenerMap::iterator it =
    915       service_owner_changed_listener_map_.find(service_name);
    916   if (it == service_owner_changed_listener_map_.end()) {
    917     // Add a match rule for the new service name.
    918     const std::string name_owner_changed_match_rule =
    919         base::StringPrintf(kServiceNameOwnerChangeMatchRule,
    920                            service_name.c_str());
    921     ScopedDBusError error;
    922     AddMatch(name_owner_changed_match_rule, error.get());
    923     if (error.is_set()) {
    924       LOG(ERROR) << "Failed to add match rule for " << service_name
    925                  << ". Got " << error.name() << ": " << error.message();
    926       return;
    927     }
    928 
    929     service_owner_changed_listener_map_[service_name].push_back(callback);
    930     return;
    931   }
    932 
    933   // Check if the callback has already been added.
    934   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
    935   for (size_t i = 0; i < callbacks.size(); ++i) {
    936     if (callbacks[i].Equals(callback))
    937       return;
    938   }
    939   callbacks.push_back(callback);
    940 }
    941 
    942 void Bus::UnlistenForServiceOwnerChange(
    943     const std::string& service_name,
    944     const GetServiceOwnerCallback& callback) {
    945   AssertOnOriginThread();
    946   DCHECK(!service_name.empty());
    947   DCHECK(!callback.is_null());
    948 
    949   GetDBusTaskRunner()->PostTask(
    950       FROM_HERE,
    951       base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal,
    952                  this, service_name, callback));
    953 }
    954 
    955 void Bus::UnlistenForServiceOwnerChangeInternal(
    956     const std::string& service_name,
    957     const GetServiceOwnerCallback& callback) {
    958   AssertOnDBusThread();
    959   DCHECK(!service_name.empty());
    960   DCHECK(!callback.is_null());
    961 
    962   ServiceOwnerChangedListenerMap::iterator it =
    963       service_owner_changed_listener_map_.find(service_name);
    964   if (it == service_owner_changed_listener_map_.end())
    965     return;
    966 
    967   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
    968   for (size_t i = 0; i < callbacks.size(); ++i) {
    969     if (callbacks[i].Equals(callback)) {
    970       callbacks.erase(callbacks.begin() + i);
    971       break;  // There can be only one.
    972     }
    973   }
    974   if (!callbacks.empty())
    975     return;
    976 
    977   // Last callback for |service_name| has been removed, remove match rule.
    978   const std::string name_owner_changed_match_rule =
    979       base::StringPrintf(kServiceNameOwnerChangeMatchRule,
    980                          service_name.c_str());
    981   ScopedDBusError error;
    982   RemoveMatch(name_owner_changed_match_rule, error.get());
    983   // And remove |service_owner_changed_listener_map_| entry.
    984   service_owner_changed_listener_map_.erase(it);
    985 
    986   if (service_owner_changed_listener_map_.empty()) {
    987     bool filter_removed =
    988         RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
    989     DCHECK(filter_removed);
    990   }
    991 }
    992 
    993 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
    994   AssertOnDBusThread();
    995 
    996   // watch will be deleted when raw_watch is removed in OnRemoveWatch().
    997   Watch* watch = new Watch(raw_watch);
    998   if (watch->IsReadyToBeWatched()) {
    999     watch->StartWatching();
   1000   }
   1001   ++num_pending_watches_;
   1002   return true;
   1003 }
   1004 
   1005 void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
   1006   AssertOnDBusThread();
   1007 
   1008   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
   1009   delete watch;
   1010   --num_pending_watches_;
   1011 }
   1012 
   1013 void Bus::OnToggleWatch(DBusWatch* raw_watch) {
   1014   AssertOnDBusThread();
   1015 
   1016   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
   1017   if (watch->IsReadyToBeWatched()) {
   1018     watch->StartWatching();
   1019   } else {
   1020     // It's safe to call this if StartWatching() wasn't called, per
   1021     // message_pump_libevent.h.
   1022     watch->StopWatching();
   1023   }
   1024 }
   1025 
   1026 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
   1027   AssertOnDBusThread();
   1028 
   1029   // timeout will be deleted when raw_timeout is removed in
   1030   // OnRemoveTimeoutThunk().
   1031   Timeout* timeout = new Timeout(raw_timeout);
   1032   if (timeout->IsReadyToBeMonitored()) {
   1033     timeout->StartMonitoring(this);
   1034   }
   1035   ++num_pending_timeouts_;
   1036   return true;
   1037 }
   1038 
   1039 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
   1040   AssertOnDBusThread();
   1041 
   1042   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
   1043   timeout->Complete();
   1044   --num_pending_timeouts_;
   1045 }
   1046 
   1047 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
   1048   AssertOnDBusThread();
   1049 
   1050   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
   1051   if (timeout->IsReadyToBeMonitored()) {
   1052     timeout->StartMonitoring(this);
   1053   } else {
   1054     timeout->StopMonitoring();
   1055   }
   1056 }
   1057 
   1058 void Bus::OnDispatchStatusChanged(DBusConnection* connection,
   1059                                   DBusDispatchStatus status) {
   1060   DCHECK_EQ(connection, connection_);
   1061   AssertOnDBusThread();
   1062 
   1063   // We cannot call ProcessAllIncomingDataIfAny() here, as calling
   1064   // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
   1065   // prohibited by the D-Bus library. Hence, we post a task here instead.
   1066   // See comments for dbus_connection_set_dispatch_status_function().
   1067   GetDBusTaskRunner()->PostTask(FROM_HERE,
   1068                                 base::Bind(&Bus::ProcessAllIncomingDataIfAny,
   1069                                            this));
   1070 }
   1071 
   1072 void Bus::OnConnectionDisconnected(DBusConnection* connection) {
   1073   AssertOnDBusThread();
   1074 
   1075   if (!on_disconnected_closure_.is_null())
   1076     GetOriginTaskRunner()->PostTask(FROM_HERE, on_disconnected_closure_);
   1077 
   1078   if (!connection)
   1079     return;
   1080   DCHECK(!dbus_connection_get_is_connected(connection));
   1081 
   1082   ShutdownAndBlock();
   1083 }
   1084 
   1085 void Bus::OnServiceOwnerChanged(DBusMessage* message) {
   1086   DCHECK(message);
   1087   AssertOnDBusThread();
   1088 
   1089   // |message| will be unrefed on exit of the function. Increment the
   1090   // reference so we can use it in Signal::FromRawMessage() below.
   1091   dbus_message_ref(message);
   1092   scoped_ptr<Signal> signal(Signal::FromRawMessage(message));
   1093 
   1094   // Confirm the validity of the NameOwnerChanged signal.
   1095   if (signal->GetMember() != kNameOwnerChangedSignal ||
   1096       signal->GetInterface() != DBUS_INTERFACE_DBUS ||
   1097       signal->GetSender() != DBUS_SERVICE_DBUS) {
   1098     return;
   1099   }
   1100 
   1101   MessageReader reader(signal.get());
   1102   std::string service_name;
   1103   std::string old_owner;
   1104   std::string new_owner;
   1105   if (!reader.PopString(&service_name) ||
   1106       !reader.PopString(&old_owner) ||
   1107       !reader.PopString(&new_owner)) {
   1108     return;
   1109   }
   1110 
   1111   ServiceOwnerChangedListenerMap::const_iterator it =
   1112       service_owner_changed_listener_map_.find(service_name);
   1113   if (it == service_owner_changed_listener_map_.end())
   1114     return;
   1115 
   1116   const std::vector<GetServiceOwnerCallback>& callbacks = it->second;
   1117   for (size_t i = 0; i < callbacks.size(); ++i) {
   1118     GetOriginTaskRunner()->PostTask(FROM_HERE,
   1119                                     base::Bind(callbacks[i], new_owner));
   1120   }
   1121 }
   1122 
   1123 // static
   1124 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
   1125   Bus* self = static_cast<Bus*>(data);
   1126   return self->OnAddWatch(raw_watch);
   1127 }
   1128 
   1129 // static
   1130 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
   1131   Bus* self = static_cast<Bus*>(data);
   1132   self->OnRemoveWatch(raw_watch);
   1133 }
   1134 
   1135 // static
   1136 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
   1137   Bus* self = static_cast<Bus*>(data);
   1138   self->OnToggleWatch(raw_watch);
   1139 }
   1140 
   1141 // static
   1142 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1143   Bus* self = static_cast<Bus*>(data);
   1144   return self->OnAddTimeout(raw_timeout);
   1145 }
   1146 
   1147 // static
   1148 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1149   Bus* self = static_cast<Bus*>(data);
   1150   self->OnRemoveTimeout(raw_timeout);
   1151 }
   1152 
   1153 // static
   1154 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1155   Bus* self = static_cast<Bus*>(data);
   1156   self->OnToggleTimeout(raw_timeout);
   1157 }
   1158 
   1159 // static
   1160 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
   1161                                        DBusDispatchStatus status,
   1162                                        void* data) {
   1163   Bus* self = static_cast<Bus*>(data);
   1164   self->OnDispatchStatusChanged(connection, status);
   1165 }
   1166 
   1167 // static
   1168 DBusHandlerResult Bus::OnConnectionDisconnectedFilter(
   1169     DBusConnection* connection,
   1170     DBusMessage* message,
   1171     void* data) {
   1172   if (dbus_message_is_signal(message,
   1173                              DBUS_INTERFACE_LOCAL,
   1174                              kDisconnectedSignal)) {
   1175     Bus* self = static_cast<Bus*>(data);
   1176     self->OnConnectionDisconnected(connection);
   1177     return DBUS_HANDLER_RESULT_HANDLED;
   1178   }
   1179   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
   1180 }
   1181 
   1182 // static
   1183 DBusHandlerResult Bus::OnServiceOwnerChangedFilter(
   1184     DBusConnection* connection,
   1185     DBusMessage* message,
   1186     void* data) {
   1187   if (dbus_message_is_signal(message,
   1188                              DBUS_INTERFACE_DBUS,
   1189                              kNameOwnerChangedSignal)) {
   1190     Bus* self = static_cast<Bus*>(data);
   1191     self->OnServiceOwnerChanged(message);
   1192   }
   1193   // Always return unhandled to let others, e.g. ObjectProxies, handle the same
   1194   // signal.
   1195   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
   1196 }
   1197 
   1198 }  // namespace dbus
   1199