Home | History | Annotate | Download | only in dbus
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "dbus/bus.h"
      6 
      7 #include <stddef.h>
      8 
      9 #include "base/bind.h"
     10 #include "base/logging.h"
     11 #include "base/message_loop/message_loop.h"
     12 #include "base/stl_util.h"
     13 #include "base/strings/stringprintf.h"
     14 #include "base/threading/thread.h"
     15 #include "base/threading/thread_restrictions.h"
     16 #include "base/threading/thread_task_runner_handle.h"
     17 #include "base/time/time.h"
     18 #include "dbus/exported_object.h"
     19 #include "dbus/message.h"
     20 #include "dbus/object_manager.h"
     21 #include "dbus/object_path.h"
     22 #include "dbus/object_proxy.h"
     23 #include "dbus/scoped_dbus_error.h"
     24 
     25 namespace dbus {
     26 
     27 namespace {
     28 
     29 // The NameOwnerChanged member in org.freedesktop.DBus
     30 const char kNameOwnerChangedSignal[] = "NameOwnerChanged";
     31 
     32 // The match rule used to filter for changes to a given service name owner.
     33 const char kServiceNameOwnerChangeMatchRule[] =
     34     "type='signal',interface='org.freedesktop.DBus',"
     35     "member='NameOwnerChanged',path='/org/freedesktop/DBus',"
     36     "sender='org.freedesktop.DBus',arg0='%s'";
     37 
     38 // The class is used for watching the file descriptor used for D-Bus
     39 // communication.
     40 class Watch : public base::MessagePumpLibevent::Watcher {
     41  public:
     42   explicit Watch(DBusWatch* watch)
     43       : raw_watch_(watch) {
     44     dbus_watch_set_data(raw_watch_, this, NULL);
     45   }
     46 
     47   ~Watch() override { dbus_watch_set_data(raw_watch_, NULL, NULL); }
     48 
     49   // Returns true if the underlying file descriptor is ready to be watched.
     50   bool IsReadyToBeWatched() {
     51     return dbus_watch_get_enabled(raw_watch_);
     52   }
     53 
     54   // Starts watching the underlying file descriptor.
     55   void StartWatching() {
     56     const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
     57     const int flags = dbus_watch_get_flags(raw_watch_);
     58 
     59     base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ;
     60     if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
     61       mode = base::MessageLoopForIO::WATCH_READ_WRITE;
     62     else if (flags & DBUS_WATCH_READABLE)
     63       mode = base::MessageLoopForIO::WATCH_READ;
     64     else if (flags & DBUS_WATCH_WRITABLE)
     65       mode = base::MessageLoopForIO::WATCH_WRITE;
     66     else
     67       NOTREACHED();
     68 
     69     const bool persistent = true;  // Watch persistently.
     70     const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor(
     71         file_descriptor, persistent, mode, &file_descriptor_watcher_, this);
     72     CHECK(success) << "Unable to allocate memory";
     73   }
     74 
     75   // Stops watching the underlying file descriptor.
     76   void StopWatching() {
     77     file_descriptor_watcher_.StopWatchingFileDescriptor();
     78   }
     79 
     80  private:
     81   // Implement MessagePumpLibevent::Watcher.
     82   void OnFileCanReadWithoutBlocking(int /*file_descriptor*/) override {
     83     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
     84     CHECK(success) << "Unable to allocate memory";
     85   }
     86 
     87   // Implement MessagePumpLibevent::Watcher.
     88   void OnFileCanWriteWithoutBlocking(int /*file_descriptor*/) override {
     89     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
     90     CHECK(success) << "Unable to allocate memory";
     91   }
     92 
     93   DBusWatch* raw_watch_;
     94   base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
     95 };
     96 
     97 // The class is used for monitoring the timeout used for D-Bus method
     98 // calls.
     99 //
    100 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
    101 // the object is is alive when HandleTimeout() is called. It's unlikely
    102 // but it may be possible that HandleTimeout() is called after
    103 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
    104 // Bus::OnRemoveTimeout().
    105 class Timeout : public base::RefCountedThreadSafe<Timeout> {
    106  public:
    107   explicit Timeout(DBusTimeout* timeout)
    108       : raw_timeout_(timeout),
    109         monitoring_is_active_(false),
    110         is_completed(false) {
    111     dbus_timeout_set_data(raw_timeout_, this, NULL);
    112     AddRef();  // Balanced on Complete().
    113   }
    114 
    115   // Returns true if the timeout is ready to be monitored.
    116   bool IsReadyToBeMonitored() {
    117     return dbus_timeout_get_enabled(raw_timeout_);
    118   }
    119 
    120   // Starts monitoring the timeout.
    121   void StartMonitoring(Bus* bus) {
    122     bus->GetDBusTaskRunner()->PostDelayedTask(
    123         FROM_HERE,
    124         base::Bind(&Timeout::HandleTimeout, this),
    125         GetInterval());
    126     monitoring_is_active_ = true;
    127   }
    128 
    129   // Stops monitoring the timeout.
    130   void StopMonitoring() {
    131     // We cannot take back the delayed task we posted in
    132     // StartMonitoring(), so we just mark the monitoring is inactive now.
    133     monitoring_is_active_ = false;
    134   }
    135 
    136   // Returns the interval.
    137   base::TimeDelta GetInterval() {
    138     return base::TimeDelta::FromMilliseconds(
    139         dbus_timeout_get_interval(raw_timeout_));
    140   }
    141 
    142   // Cleans up the raw_timeout and marks that timeout is completed.
    143   // See the class comment above for why we are doing this.
    144   void Complete() {
    145     dbus_timeout_set_data(raw_timeout_, NULL, NULL);
    146     is_completed = true;
    147     Release();
    148   }
    149 
    150  private:
    151   friend class base::RefCountedThreadSafe<Timeout>;
    152   ~Timeout() {
    153   }
    154 
    155   // Handles the timeout.
    156   void HandleTimeout() {
    157     // If the timeout is marked completed, we should do nothing. This can
    158     // occur if this function is called after Bus::OnRemoveTimeout().
    159     if (is_completed)
    160       return;
    161     // Skip if monitoring is canceled.
    162     if (!monitoring_is_active_)
    163       return;
    164 
    165     const bool success = dbus_timeout_handle(raw_timeout_);
    166     CHECK(success) << "Unable to allocate memory";
    167   }
    168 
    169   DBusTimeout* raw_timeout_;
    170   bool monitoring_is_active_;
    171   bool is_completed;
    172 };
    173 
    174 }  // namespace
    175 
    176 Bus::Options::Options()
    177   : bus_type(SESSION),
    178     connection_type(PRIVATE) {
    179 }
    180 
    181 Bus::Options::~Options() {
    182 }
    183 
    184 Bus::Bus(const Options& options)
    185     : bus_type_(options.bus_type),
    186       connection_type_(options.connection_type),
    187       dbus_task_runner_(options.dbus_task_runner),
    188       on_shutdown_(base::WaitableEvent::ResetPolicy::AUTOMATIC,
    189                    base::WaitableEvent::InitialState::NOT_SIGNALED),
    190       connection_(NULL),
    191       origin_thread_id_(base::PlatformThread::CurrentId()),
    192       async_operations_set_up_(false),
    193       shutdown_completed_(false),
    194       num_pending_watches_(0),
    195       num_pending_timeouts_(0),
    196       address_(options.address) {
    197   // This is safe to call multiple times.
    198   dbus_threads_init_default();
    199   // The origin message loop is unnecessary if the client uses synchronous
    200   // functions only.
    201   if (base::ThreadTaskRunnerHandle::IsSet())
    202     origin_task_runner_ = base::ThreadTaskRunnerHandle::Get();
    203 }
    204 
    205 Bus::~Bus() {
    206   DCHECK(!connection_);
    207   DCHECK(owned_service_names_.empty());
    208   DCHECK(match_rules_added_.empty());
    209   DCHECK(filter_functions_added_.empty());
    210   DCHECK(registered_object_paths_.empty());
    211   DCHECK_EQ(0, num_pending_watches_);
    212   // TODO(satorux): This check fails occasionally in browser_tests for tests
    213   // that run very quickly. Perhaps something does not have time to clean up.
    214   // Despite the check failing, the tests seem to run fine. crosbug.com/23416
    215   // DCHECK_EQ(0, num_pending_timeouts_);
    216 }
    217 
    218 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
    219                                  const ObjectPath& object_path) {
    220   return GetObjectProxyWithOptions(service_name, object_path,
    221                                    ObjectProxy::DEFAULT_OPTIONS);
    222 }
    223 
    224 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
    225                                             const ObjectPath& object_path,
    226                                             int options) {
    227   AssertOnOriginThread();
    228 
    229   // Check if we already have the requested object proxy.
    230   const ObjectProxyTable::key_type key(service_name + object_path.value(),
    231                                        options);
    232   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
    233   if (iter != object_proxy_table_.end()) {
    234     return iter->second.get();
    235   }
    236 
    237   scoped_refptr<ObjectProxy> object_proxy =
    238       new ObjectProxy(this, service_name, object_path, options);
    239   object_proxy_table_[key] = object_proxy;
    240 
    241   return object_proxy.get();
    242 }
    243 
    244 bool Bus::RemoveObjectProxy(const std::string& service_name,
    245                             const ObjectPath& object_path,
    246                             const base::Closure& callback) {
    247   return RemoveObjectProxyWithOptions(service_name, object_path,
    248                                       ObjectProxy::DEFAULT_OPTIONS,
    249                                       callback);
    250 }
    251 
    252 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
    253                                        const ObjectPath& object_path,
    254                                        int options,
    255                                        const base::Closure& callback) {
    256   AssertOnOriginThread();
    257 
    258   // Check if we have the requested object proxy.
    259   const ObjectProxyTable::key_type key(service_name + object_path.value(),
    260                                        options);
    261   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
    262   if (iter != object_proxy_table_.end()) {
    263     scoped_refptr<ObjectProxy> object_proxy = iter->second;
    264     object_proxy_table_.erase(iter);
    265     // Object is present. Remove it now and Detach on the DBus thread.
    266     GetDBusTaskRunner()->PostTask(
    267         FROM_HERE,
    268         base::Bind(&Bus::RemoveObjectProxyInternal,
    269                    this, object_proxy, callback));
    270     return true;
    271   }
    272   return false;
    273 }
    274 
    275 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy,
    276                                     const base::Closure& callback) {
    277   AssertOnDBusThread();
    278 
    279   object_proxy.get()->Detach();
    280 
    281   GetOriginTaskRunner()->PostTask(FROM_HERE, callback);
    282 }
    283 
    284 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
    285   AssertOnOriginThread();
    286 
    287   // Check if we already have the requested exported object.
    288   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
    289   if (iter != exported_object_table_.end()) {
    290     return iter->second.get();
    291   }
    292 
    293   scoped_refptr<ExportedObject> exported_object =
    294       new ExportedObject(this, object_path);
    295   exported_object_table_[object_path] = exported_object;
    296 
    297   return exported_object.get();
    298 }
    299 
    300 void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
    301   AssertOnOriginThread();
    302 
    303   // Remove the registered object from the table first, to allow a new
    304   // GetExportedObject() call to return a new object, rather than this one.
    305   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
    306   if (iter == exported_object_table_.end())
    307     return;
    308 
    309   scoped_refptr<ExportedObject> exported_object = iter->second;
    310   exported_object_table_.erase(iter);
    311 
    312   // Post the task to perform the final unregistration to the D-Bus thread.
    313   // Since the registration also happens on the D-Bus thread in
    314   // TryRegisterObjectPath(), and the task runner we post to is a
    315   // SequencedTaskRunner, there is a guarantee that this will happen before any
    316   // future registration call.
    317   GetDBusTaskRunner()->PostTask(
    318       FROM_HERE,
    319       base::Bind(&Bus::UnregisterExportedObjectInternal,
    320                  this, exported_object));
    321 }
    322 
    323 void Bus::UnregisterExportedObjectInternal(
    324     scoped_refptr<ExportedObject> exported_object) {
    325   AssertOnDBusThread();
    326 
    327   exported_object->Unregister();
    328 }
    329 
    330 ObjectManager* Bus::GetObjectManager(const std::string& service_name,
    331                                      const ObjectPath& object_path) {
    332   AssertOnOriginThread();
    333 
    334   // Check if we already have the requested object manager.
    335   const ObjectManagerTable::key_type key(service_name + object_path.value());
    336   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
    337   if (iter != object_manager_table_.end()) {
    338     return iter->second.get();
    339   }
    340 
    341   scoped_refptr<ObjectManager> object_manager =
    342       new ObjectManager(this, service_name, object_path);
    343   object_manager_table_[key] = object_manager;
    344 
    345   return object_manager.get();
    346 }
    347 
    348 bool Bus::RemoveObjectManager(const std::string& service_name,
    349                               const ObjectPath& object_path,
    350                               const base::Closure& callback) {
    351   AssertOnOriginThread();
    352   DCHECK(!callback.is_null());
    353 
    354   const ObjectManagerTable::key_type key(service_name + object_path.value());
    355   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
    356   if (iter == object_manager_table_.end())
    357     return false;
    358 
    359   // ObjectManager is present. Remove it now and CleanUp on the DBus thread.
    360   scoped_refptr<ObjectManager> object_manager = iter->second;
    361   object_manager_table_.erase(iter);
    362 
    363   GetDBusTaskRunner()->PostTask(
    364       FROM_HERE,
    365       base::Bind(&Bus::RemoveObjectManagerInternal,
    366                  this, object_manager, callback));
    367 
    368   return true;
    369 }
    370 
    371 void Bus::RemoveObjectManagerInternal(
    372       scoped_refptr<dbus::ObjectManager> object_manager,
    373       const base::Closure& callback) {
    374   AssertOnDBusThread();
    375   DCHECK(object_manager.get());
    376 
    377   object_manager->CleanUp();
    378 
    379   // The ObjectManager has to be deleted on the origin thread since it was
    380   // created there.
    381   GetOriginTaskRunner()->PostTask(
    382       FROM_HERE,
    383       base::Bind(&Bus::RemoveObjectManagerInternalHelper,
    384                  this, object_manager, callback));
    385 }
    386 
    387 void Bus::RemoveObjectManagerInternalHelper(
    388       scoped_refptr<dbus::ObjectManager> object_manager,
    389       const base::Closure& callback) {
    390   AssertOnOriginThread();
    391   DCHECK(object_manager.get());
    392 
    393   // Release the object manager and run the callback.
    394   object_manager = NULL;
    395   callback.Run();
    396 }
    397 
    398 void Bus::GetManagedObjects() {
    399   for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
    400        iter != object_manager_table_.end(); ++iter) {
    401     iter->second->GetManagedObjects();
    402   }
    403 }
    404 
    405 bool Bus::Connect() {
    406   // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
    407   AssertOnDBusThread();
    408 
    409   // Check if it's already initialized.
    410   if (connection_)
    411     return true;
    412 
    413   ScopedDBusError error;
    414   if (bus_type_ == CUSTOM_ADDRESS) {
    415     if (connection_type_ == PRIVATE) {
    416       connection_ = dbus_connection_open_private(address_.c_str(), error.get());
    417     } else {
    418       connection_ = dbus_connection_open(address_.c_str(), error.get());
    419     }
    420   } else {
    421     const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
    422     if (connection_type_ == PRIVATE) {
    423       connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
    424     } else {
    425       connection_ = dbus_bus_get(dbus_bus_type, error.get());
    426     }
    427   }
    428   if (!connection_) {
    429     LOG(ERROR) << "Failed to connect to the bus: "
    430                << (error.is_set() ? error.message() : "");
    431     return false;
    432   }
    433 
    434   if (bus_type_ == CUSTOM_ADDRESS) {
    435     // We should call dbus_bus_register here, otherwise unique name can not be
    436     // acquired. According to dbus specification, it is responsible to call
    437     // org.freedesktop.DBus.Hello method at the beging of bus connection to
    438     // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
    439     // called internally.
    440     if (!dbus_bus_register(connection_, error.get())) {
    441       LOG(ERROR) << "Failed to register the bus component: "
    442                  << (error.is_set() ? error.message() : "");
    443       return false;
    444     }
    445   }
    446 
    447   return true;
    448 }
    449 
    450 void Bus::ClosePrivateConnection() {
    451   // dbus_connection_close is blocking call.
    452   AssertOnDBusThread();
    453   DCHECK_EQ(PRIVATE, connection_type_)
    454       << "non-private connection should not be closed";
    455   dbus_connection_close(connection_);
    456 }
    457 
    458 void Bus::ShutdownAndBlock() {
    459   AssertOnDBusThread();
    460 
    461   if (shutdown_completed_)
    462     return;  // Already shutdowned, just return.
    463 
    464   // Unregister the exported objects.
    465   for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
    466        iter != exported_object_table_.end(); ++iter) {
    467     iter->second->Unregister();
    468   }
    469 
    470   // Release all service names.
    471   for (std::set<std::string>::iterator iter = owned_service_names_.begin();
    472        iter != owned_service_names_.end();) {
    473     // This is a bit tricky but we should increment the iter here as
    474     // ReleaseOwnership() may remove |service_name| from the set.
    475     const std::string& service_name = *iter++;
    476     ReleaseOwnership(service_name);
    477   }
    478   if (!owned_service_names_.empty()) {
    479     LOG(ERROR) << "Failed to release all service names. # of services left: "
    480                << owned_service_names_.size();
    481   }
    482 
    483   // Detach from the remote objects.
    484   for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
    485        iter != object_proxy_table_.end(); ++iter) {
    486     iter->second->Detach();
    487   }
    488 
    489   // Clean up the object managers.
    490   for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
    491        iter != object_manager_table_.end(); ++iter) {
    492     iter->second->CleanUp();
    493   }
    494 
    495   // Release object proxies and exported objects here. We should do this
    496   // here rather than in the destructor to avoid memory leaks due to
    497   // cyclic references.
    498   object_proxy_table_.clear();
    499   exported_object_table_.clear();
    500 
    501   // Private connection should be closed.
    502   if (connection_) {
    503     // Remove Disconnected watcher.
    504     ScopedDBusError error;
    505 
    506     if (connection_type_ == PRIVATE)
    507       ClosePrivateConnection();
    508     // dbus_connection_close() won't unref.
    509     dbus_connection_unref(connection_);
    510   }
    511 
    512   connection_ = NULL;
    513   shutdown_completed_ = true;
    514 }
    515 
    516 void Bus::ShutdownOnDBusThreadAndBlock() {
    517   AssertOnOriginThread();
    518   DCHECK(dbus_task_runner_.get());
    519 
    520   GetDBusTaskRunner()->PostTask(
    521       FROM_HERE,
    522       base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this));
    523 
    524   // http://crbug.com/125222
    525   base::ThreadRestrictions::ScopedAllowWait allow_wait;
    526 
    527   // Wait until the shutdown is complete on the D-Bus thread.
    528   // The shutdown should not hang, but set timeout just in case.
    529   const int kTimeoutSecs = 3;
    530   const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
    531   const bool signaled = on_shutdown_.TimedWait(timeout);
    532   LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
    533 }
    534 
    535 void Bus::RequestOwnership(const std::string& service_name,
    536                            ServiceOwnershipOptions options,
    537                            OnOwnershipCallback on_ownership_callback) {
    538   AssertOnOriginThread();
    539 
    540   GetDBusTaskRunner()->PostTask(
    541       FROM_HERE,
    542       base::Bind(&Bus::RequestOwnershipInternal,
    543                  this, service_name, options, on_ownership_callback));
    544 }
    545 
    546 void Bus::RequestOwnershipInternal(const std::string& service_name,
    547                                    ServiceOwnershipOptions options,
    548                                    OnOwnershipCallback on_ownership_callback) {
    549   AssertOnDBusThread();
    550 
    551   bool success = Connect();
    552   if (success)
    553     success = RequestOwnershipAndBlock(service_name, options);
    554 
    555   GetOriginTaskRunner()->PostTask(FROM_HERE,
    556                                   base::Bind(on_ownership_callback,
    557                                              service_name,
    558                                              success));
    559 }
    560 
    561 bool Bus::RequestOwnershipAndBlock(const std::string& service_name,
    562                                    ServiceOwnershipOptions options) {
    563   DCHECK(connection_);
    564   // dbus_bus_request_name() is a blocking call.
    565   AssertOnDBusThread();
    566 
    567   // Check if we already own the service name.
    568   if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
    569     return true;
    570   }
    571 
    572   ScopedDBusError error;
    573   const int result = dbus_bus_request_name(connection_,
    574                                            service_name.c_str(),
    575                                            options,
    576                                            error.get());
    577   if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
    578     LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
    579                << (error.is_set() ? error.message() : "");
    580     return false;
    581   }
    582   owned_service_names_.insert(service_name);
    583   return true;
    584 }
    585 
    586 bool Bus::ReleaseOwnership(const std::string& service_name) {
    587   DCHECK(connection_);
    588   // dbus_bus_request_name() is a blocking call.
    589   AssertOnDBusThread();
    590 
    591   // Check if we already own the service name.
    592   std::set<std::string>::iterator found =
    593       owned_service_names_.find(service_name);
    594   if (found == owned_service_names_.end()) {
    595     LOG(ERROR) << service_name << " is not owned by the bus";
    596     return false;
    597   }
    598 
    599   ScopedDBusError error;
    600   const int result = dbus_bus_release_name(connection_, service_name.c_str(),
    601                                            error.get());
    602   if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
    603     owned_service_names_.erase(found);
    604     return true;
    605   } else {
    606     LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
    607                << (error.is_set() ? error.message() : "")
    608                << ", result code: " << result;
    609     return false;
    610   }
    611 }
    612 
    613 bool Bus::SetUpAsyncOperations() {
    614   DCHECK(connection_);
    615   AssertOnDBusThread();
    616 
    617   if (async_operations_set_up_)
    618     return true;
    619 
    620   // Process all the incoming data if any, so that OnDispatchStatus() will
    621   // be called when the incoming data is ready.
    622   ProcessAllIncomingDataIfAny();
    623 
    624   bool success = dbus_connection_set_watch_functions(connection_,
    625                                                      &Bus::OnAddWatchThunk,
    626                                                      &Bus::OnRemoveWatchThunk,
    627                                                      &Bus::OnToggleWatchThunk,
    628                                                      this,
    629                                                      NULL);
    630   CHECK(success) << "Unable to allocate memory";
    631 
    632   success = dbus_connection_set_timeout_functions(connection_,
    633                                                   &Bus::OnAddTimeoutThunk,
    634                                                   &Bus::OnRemoveTimeoutThunk,
    635                                                   &Bus::OnToggleTimeoutThunk,
    636                                                   this,
    637                                                   NULL);
    638   CHECK(success) << "Unable to allocate memory";
    639 
    640   dbus_connection_set_dispatch_status_function(
    641       connection_,
    642       &Bus::OnDispatchStatusChangedThunk,
    643       this,
    644       NULL);
    645 
    646   async_operations_set_up_ = true;
    647 
    648   return true;
    649 }
    650 
    651 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
    652                                         int timeout_ms,
    653                                         DBusError* error) {
    654   DCHECK(connection_);
    655   AssertOnDBusThread();
    656 
    657   return dbus_connection_send_with_reply_and_block(
    658       connection_, request, timeout_ms, error);
    659 }
    660 
    661 void Bus::SendWithReply(DBusMessage* request,
    662                         DBusPendingCall** pending_call,
    663                         int timeout_ms) {
    664   DCHECK(connection_);
    665   AssertOnDBusThread();
    666 
    667   const bool success = dbus_connection_send_with_reply(
    668       connection_, request, pending_call, timeout_ms);
    669   CHECK(success) << "Unable to allocate memory";
    670 }
    671 
    672 void Bus::Send(DBusMessage* request, uint32_t* serial) {
    673   DCHECK(connection_);
    674   AssertOnDBusThread();
    675 
    676   const bool success = dbus_connection_send(connection_, request, serial);
    677   CHECK(success) << "Unable to allocate memory";
    678 }
    679 
    680 void Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
    681                             void* user_data) {
    682   DCHECK(connection_);
    683   AssertOnDBusThread();
    684 
    685   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
    686       std::make_pair(filter_function, user_data);
    687   if (filter_functions_added_.find(filter_data_pair) !=
    688       filter_functions_added_.end()) {
    689     VLOG(1) << "Filter function already exists: " << filter_function
    690             << " with associated data: " << user_data;
    691     return;
    692   }
    693 
    694   const bool success = dbus_connection_add_filter(
    695       connection_, filter_function, user_data, NULL);
    696   CHECK(success) << "Unable to allocate memory";
    697   filter_functions_added_.insert(filter_data_pair);
    698 }
    699 
    700 void Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
    701                                void* user_data) {
    702   DCHECK(connection_);
    703   AssertOnDBusThread();
    704 
    705   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
    706       std::make_pair(filter_function, user_data);
    707   if (filter_functions_added_.find(filter_data_pair) ==
    708       filter_functions_added_.end()) {
    709     VLOG(1) << "Requested to remove an unknown filter function: "
    710             << filter_function
    711             << " with associated data: " << user_data;
    712     return;
    713   }
    714 
    715   dbus_connection_remove_filter(connection_, filter_function, user_data);
    716   filter_functions_added_.erase(filter_data_pair);
    717 }
    718 
    719 void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
    720   DCHECK(connection_);
    721   AssertOnDBusThread();
    722 
    723   std::map<std::string, int>::iterator iter =
    724       match_rules_added_.find(match_rule);
    725   if (iter != match_rules_added_.end()) {
    726     // The already existing rule's counter is incremented.
    727     iter->second++;
    728 
    729     VLOG(1) << "Match rule already exists: " << match_rule;
    730     return;
    731   }
    732 
    733   dbus_bus_add_match(connection_, match_rule.c_str(), error);
    734   match_rules_added_[match_rule] = 1;
    735 }
    736 
    737 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
    738   DCHECK(connection_);
    739   AssertOnDBusThread();
    740 
    741   std::map<std::string, int>::iterator iter =
    742       match_rules_added_.find(match_rule);
    743   if (iter == match_rules_added_.end()) {
    744     LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
    745     return false;
    746   }
    747 
    748   // The rule's counter is decremented and the rule is deleted when reachs 0.
    749   iter->second--;
    750   if (iter->second == 0) {
    751     dbus_bus_remove_match(connection_, match_rule.c_str(), error);
    752     match_rules_added_.erase(match_rule);
    753   }
    754   return true;
    755 }
    756 
    757 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
    758                                 const DBusObjectPathVTable* vtable,
    759                                 void* user_data,
    760                                 DBusError* error) {
    761   DCHECK(connection_);
    762   AssertOnDBusThread();
    763 
    764   if (registered_object_paths_.find(object_path) !=
    765       registered_object_paths_.end()) {
    766     LOG(ERROR) << "Object path already registered: " << object_path.value();
    767     return false;
    768   }
    769 
    770   const bool success = dbus_connection_try_register_object_path(
    771       connection_,
    772       object_path.value().c_str(),
    773       vtable,
    774       user_data,
    775       error);
    776   if (success)
    777     registered_object_paths_.insert(object_path);
    778   return success;
    779 }
    780 
    781 void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
    782   DCHECK(connection_);
    783   AssertOnDBusThread();
    784 
    785   if (registered_object_paths_.find(object_path) ==
    786       registered_object_paths_.end()) {
    787     LOG(ERROR) << "Requested to unregister an unknown object path: "
    788                << object_path.value();
    789     return;
    790   }
    791 
    792   const bool success = dbus_connection_unregister_object_path(
    793       connection_,
    794       object_path.value().c_str());
    795   CHECK(success) << "Unable to allocate memory";
    796   registered_object_paths_.erase(object_path);
    797 }
    798 
    799 void Bus::ShutdownOnDBusThreadAndBlockInternal() {
    800   AssertOnDBusThread();
    801 
    802   ShutdownAndBlock();
    803   on_shutdown_.Signal();
    804 }
    805 
    806 void Bus::ProcessAllIncomingDataIfAny() {
    807   AssertOnDBusThread();
    808 
    809   // As mentioned at the class comment in .h file, connection_ can be NULL.
    810   if (!connection_)
    811     return;
    812 
    813   // It is safe and necessary to call dbus_connection_get_dispatch_status even
    814   // if the connection is lost.
    815   if (dbus_connection_get_dispatch_status(connection_) ==
    816       DBUS_DISPATCH_DATA_REMAINS) {
    817     while (dbus_connection_dispatch(connection_) ==
    818            DBUS_DISPATCH_DATA_REMAINS) {
    819     }
    820   }
    821 }
    822 
    823 base::TaskRunner* Bus::GetDBusTaskRunner() {
    824   if (dbus_task_runner_.get())
    825     return dbus_task_runner_.get();
    826   else
    827     return GetOriginTaskRunner();
    828 }
    829 
    830 base::TaskRunner* Bus::GetOriginTaskRunner() {
    831   DCHECK(origin_task_runner_.get());
    832   return origin_task_runner_.get();
    833 }
    834 
    835 bool Bus::HasDBusThread() {
    836   return dbus_task_runner_.get() != NULL;
    837 }
    838 
    839 void Bus::AssertOnOriginThread() {
    840   DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
    841 }
    842 
    843 void Bus::AssertOnDBusThread() {
    844   base::ThreadRestrictions::AssertIOAllowed();
    845 
    846   if (dbus_task_runner_.get()) {
    847     DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread());
    848   } else {
    849     AssertOnOriginThread();
    850   }
    851 }
    852 
    853 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name,
    854                                          GetServiceOwnerOption options) {
    855   AssertOnDBusThread();
    856 
    857   MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner");
    858   MessageWriter writer(&get_name_owner_call);
    859   writer.AppendString(service_name);
    860   VLOG(1) << "Method call: " << get_name_owner_call.ToString();
    861 
    862   const ObjectPath obj_path("/org/freedesktop/DBus");
    863   if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") ||
    864       !get_name_owner_call.SetPath(obj_path)) {
    865     if (options == REPORT_ERRORS)
    866       LOG(ERROR) << "Failed to get name owner.";
    867     return "";
    868   }
    869 
    870   ScopedDBusError error;
    871   DBusMessage* response_message =
    872       SendWithReplyAndBlock(get_name_owner_call.raw_message(),
    873                             ObjectProxy::TIMEOUT_USE_DEFAULT,
    874                             error.get());
    875   if (!response_message) {
    876     if (options == REPORT_ERRORS) {
    877       LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": "
    878                  << error.message();
    879     }
    880     return "";
    881   }
    882 
    883   std::unique_ptr<Response> response(
    884       Response::FromRawMessage(response_message));
    885   MessageReader reader(response.get());
    886 
    887   std::string service_owner;
    888   if (!reader.PopString(&service_owner))
    889     service_owner.clear();
    890   return service_owner;
    891 }
    892 
    893 void Bus::GetServiceOwner(const std::string& service_name,
    894                           const GetServiceOwnerCallback& callback) {
    895   AssertOnOriginThread();
    896 
    897   GetDBusTaskRunner()->PostTask(
    898       FROM_HERE,
    899       base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback));
    900 }
    901 
    902 void Bus::GetServiceOwnerInternal(const std::string& service_name,
    903                                   const GetServiceOwnerCallback& callback) {
    904   AssertOnDBusThread();
    905 
    906   std::string service_owner;
    907   if (Connect())
    908     service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS);
    909   GetOriginTaskRunner()->PostTask(FROM_HERE,
    910                                   base::Bind(callback, service_owner));
    911 }
    912 
    913 void Bus::ListenForServiceOwnerChange(
    914     const std::string& service_name,
    915     const GetServiceOwnerCallback& callback) {
    916   AssertOnOriginThread();
    917   DCHECK(!service_name.empty());
    918   DCHECK(!callback.is_null());
    919 
    920   GetDBusTaskRunner()->PostTask(
    921       FROM_HERE,
    922       base::Bind(&Bus::ListenForServiceOwnerChangeInternal,
    923                  this, service_name, callback));
    924 }
    925 
    926 void Bus::ListenForServiceOwnerChangeInternal(
    927     const std::string& service_name,
    928     const GetServiceOwnerCallback& callback) {
    929   AssertOnDBusThread();
    930   DCHECK(!service_name.empty());
    931   DCHECK(!callback.is_null());
    932 
    933   if (!Connect() || !SetUpAsyncOperations())
    934     return;
    935 
    936   if (service_owner_changed_listener_map_.empty())
    937     AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
    938 
    939   ServiceOwnerChangedListenerMap::iterator it =
    940       service_owner_changed_listener_map_.find(service_name);
    941   if (it == service_owner_changed_listener_map_.end()) {
    942     // Add a match rule for the new service name.
    943     const std::string name_owner_changed_match_rule =
    944         base::StringPrintf(kServiceNameOwnerChangeMatchRule,
    945                            service_name.c_str());
    946     ScopedDBusError error;
    947     AddMatch(name_owner_changed_match_rule, error.get());
    948     if (error.is_set()) {
    949       LOG(ERROR) << "Failed to add match rule for " << service_name
    950                  << ". Got " << error.name() << ": " << error.message();
    951       return;
    952     }
    953 
    954     service_owner_changed_listener_map_[service_name].push_back(callback);
    955     return;
    956   }
    957 
    958   // Check if the callback has already been added.
    959   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
    960   for (size_t i = 0; i < callbacks.size(); ++i) {
    961     if (callbacks[i].Equals(callback))
    962       return;
    963   }
    964   callbacks.push_back(callback);
    965 }
    966 
    967 void Bus::UnlistenForServiceOwnerChange(
    968     const std::string& service_name,
    969     const GetServiceOwnerCallback& callback) {
    970   AssertOnOriginThread();
    971   DCHECK(!service_name.empty());
    972   DCHECK(!callback.is_null());
    973 
    974   GetDBusTaskRunner()->PostTask(
    975       FROM_HERE,
    976       base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal,
    977                  this, service_name, callback));
    978 }
    979 
    980 void Bus::UnlistenForServiceOwnerChangeInternal(
    981     const std::string& service_name,
    982     const GetServiceOwnerCallback& callback) {
    983   AssertOnDBusThread();
    984   DCHECK(!service_name.empty());
    985   DCHECK(!callback.is_null());
    986 
    987   ServiceOwnerChangedListenerMap::iterator it =
    988       service_owner_changed_listener_map_.find(service_name);
    989   if (it == service_owner_changed_listener_map_.end())
    990     return;
    991 
    992   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
    993   for (size_t i = 0; i < callbacks.size(); ++i) {
    994     if (callbacks[i].Equals(callback)) {
    995       callbacks.erase(callbacks.begin() + i);
    996       break;  // There can be only one.
    997     }
    998   }
    999   if (!callbacks.empty())
   1000     return;
   1001 
   1002   // Last callback for |service_name| has been removed, remove match rule.
   1003   const std::string name_owner_changed_match_rule =
   1004       base::StringPrintf(kServiceNameOwnerChangeMatchRule,
   1005                          service_name.c_str());
   1006   ScopedDBusError error;
   1007   RemoveMatch(name_owner_changed_match_rule, error.get());
   1008   // And remove |service_owner_changed_listener_map_| entry.
   1009   service_owner_changed_listener_map_.erase(it);
   1010 
   1011   if (service_owner_changed_listener_map_.empty())
   1012     RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
   1013 }
   1014 
   1015 std::string Bus::GetConnectionName() {
   1016   if (!connection_)
   1017     return "";
   1018   return dbus_bus_get_unique_name(connection_);
   1019 }
   1020 
   1021 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
   1022   AssertOnDBusThread();
   1023 
   1024   // watch will be deleted when raw_watch is removed in OnRemoveWatch().
   1025   Watch* watch = new Watch(raw_watch);
   1026   if (watch->IsReadyToBeWatched()) {
   1027     watch->StartWatching();
   1028   }
   1029   ++num_pending_watches_;
   1030   return true;
   1031 }
   1032 
   1033 void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
   1034   AssertOnDBusThread();
   1035 
   1036   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
   1037   delete watch;
   1038   --num_pending_watches_;
   1039 }
   1040 
   1041 void Bus::OnToggleWatch(DBusWatch* raw_watch) {
   1042   AssertOnDBusThread();
   1043 
   1044   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
   1045   if (watch->IsReadyToBeWatched()) {
   1046     watch->StartWatching();
   1047   } else {
   1048     // It's safe to call this if StartWatching() wasn't called, per
   1049     // message_pump_libevent.h.
   1050     watch->StopWatching();
   1051   }
   1052 }
   1053 
   1054 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
   1055   AssertOnDBusThread();
   1056 
   1057   // timeout will be deleted when raw_timeout is removed in
   1058   // OnRemoveTimeoutThunk().
   1059   Timeout* timeout = new Timeout(raw_timeout);
   1060   if (timeout->IsReadyToBeMonitored()) {
   1061     timeout->StartMonitoring(this);
   1062   }
   1063   ++num_pending_timeouts_;
   1064   return true;
   1065 }
   1066 
   1067 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
   1068   AssertOnDBusThread();
   1069 
   1070   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
   1071   timeout->Complete();
   1072   --num_pending_timeouts_;
   1073 }
   1074 
   1075 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
   1076   AssertOnDBusThread();
   1077 
   1078   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
   1079   if (timeout->IsReadyToBeMonitored()) {
   1080     timeout->StartMonitoring(this);
   1081   } else {
   1082     timeout->StopMonitoring();
   1083   }
   1084 }
   1085 
   1086 void Bus::OnDispatchStatusChanged(DBusConnection* connection,
   1087                                   DBusDispatchStatus /*status*/) {
   1088   DCHECK_EQ(connection, connection_);
   1089   AssertOnDBusThread();
   1090 
   1091   // We cannot call ProcessAllIncomingDataIfAny() here, as calling
   1092   // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
   1093   // prohibited by the D-Bus library. Hence, we post a task here instead.
   1094   // See comments for dbus_connection_set_dispatch_status_function().
   1095   GetDBusTaskRunner()->PostTask(FROM_HERE,
   1096                                 base::Bind(&Bus::ProcessAllIncomingDataIfAny,
   1097                                            this));
   1098 }
   1099 
   1100 void Bus::OnServiceOwnerChanged(DBusMessage* message) {
   1101   DCHECK(message);
   1102   AssertOnDBusThread();
   1103 
   1104   // |message| will be unrefed on exit of the function. Increment the
   1105   // reference so we can use it in Signal::FromRawMessage() below.
   1106   dbus_message_ref(message);
   1107   std::unique_ptr<Signal> signal(Signal::FromRawMessage(message));
   1108 
   1109   // Confirm the validity of the NameOwnerChanged signal.
   1110   if (signal->GetMember() != kNameOwnerChangedSignal ||
   1111       signal->GetInterface() != DBUS_INTERFACE_DBUS ||
   1112       signal->GetSender() != DBUS_SERVICE_DBUS) {
   1113     return;
   1114   }
   1115 
   1116   MessageReader reader(signal.get());
   1117   std::string service_name;
   1118   std::string old_owner;
   1119   std::string new_owner;
   1120   if (!reader.PopString(&service_name) ||
   1121       !reader.PopString(&old_owner) ||
   1122       !reader.PopString(&new_owner)) {
   1123     return;
   1124   }
   1125 
   1126   ServiceOwnerChangedListenerMap::const_iterator it =
   1127       service_owner_changed_listener_map_.find(service_name);
   1128   if (it == service_owner_changed_listener_map_.end())
   1129     return;
   1130 
   1131   const std::vector<GetServiceOwnerCallback>& callbacks = it->second;
   1132   for (size_t i = 0; i < callbacks.size(); ++i) {
   1133     GetOriginTaskRunner()->PostTask(FROM_HERE,
   1134                                     base::Bind(callbacks[i], new_owner));
   1135   }
   1136 }
   1137 
   1138 // static
   1139 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
   1140   Bus* self = static_cast<Bus*>(data);
   1141   return self->OnAddWatch(raw_watch);
   1142 }
   1143 
   1144 // static
   1145 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
   1146   Bus* self = static_cast<Bus*>(data);
   1147   self->OnRemoveWatch(raw_watch);
   1148 }
   1149 
   1150 // static
   1151 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
   1152   Bus* self = static_cast<Bus*>(data);
   1153   self->OnToggleWatch(raw_watch);
   1154 }
   1155 
   1156 // static
   1157 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1158   Bus* self = static_cast<Bus*>(data);
   1159   return self->OnAddTimeout(raw_timeout);
   1160 }
   1161 
   1162 // static
   1163 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1164   Bus* self = static_cast<Bus*>(data);
   1165   self->OnRemoveTimeout(raw_timeout);
   1166 }
   1167 
   1168 // static
   1169 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1170   Bus* self = static_cast<Bus*>(data);
   1171   self->OnToggleTimeout(raw_timeout);
   1172 }
   1173 
   1174 // static
   1175 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
   1176                                        DBusDispatchStatus status,
   1177                                        void* data) {
   1178   Bus* self = static_cast<Bus*>(data);
   1179   self->OnDispatchStatusChanged(connection, status);
   1180 }
   1181 
   1182 // static
   1183 DBusHandlerResult Bus::OnServiceOwnerChangedFilter(
   1184     DBusConnection* /*connection*/,
   1185     DBusMessage* message,
   1186     void* data) {
   1187   if (dbus_message_is_signal(message,
   1188                              DBUS_INTERFACE_DBUS,
   1189                              kNameOwnerChangedSignal)) {
   1190     Bus* self = static_cast<Bus*>(data);
   1191     self->OnServiceOwnerChanged(message);
   1192   }
   1193   // Always return unhandled to let others, e.g. ObjectProxies, handle the same
   1194   // signal.
   1195   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
   1196 }
   1197 
   1198 }  // namespace dbus
   1199