Home | History | Annotate | Download | only in dbus
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "dbus/bus.h"
      6 
      7 #include <stddef.h>
      8 
      9 #include "base/bind.h"
     10 #include "base/logging.h"
     11 #include "base/message_loop/message_loop.h"
     12 #include "base/stl_util.h"
     13 #include "base/strings/stringprintf.h"
     14 #include "base/threading/thread.h"
     15 #include "base/threading/thread_restrictions.h"
     16 #include "base/threading/thread_task_runner_handle.h"
     17 #include "base/time/time.h"
     18 #include "dbus/exported_object.h"
     19 #include "dbus/message.h"
     20 #include "dbus/object_manager.h"
     21 #include "dbus/object_path.h"
     22 #include "dbus/object_proxy.h"
     23 #include "dbus/scoped_dbus_error.h"
     24 
     25 namespace dbus {
     26 
     27 namespace {
     28 
     29 const char kDisconnectedSignal[] = "Disconnected";
     30 const char kDisconnectedMatchRule[] =
     31     "type='signal', path='/org/freedesktop/DBus/Local',"
     32     "interface='org.freedesktop.DBus.Local', member='Disconnected'";
     33 
     34 // The NameOwnerChanged member in org.freedesktop.DBus
     35 const char kNameOwnerChangedSignal[] = "NameOwnerChanged";
     36 
     37 // The match rule used to filter for changes to a given service name owner.
     38 const char kServiceNameOwnerChangeMatchRule[] =
     39     "type='signal',interface='org.freedesktop.DBus',"
     40     "member='NameOwnerChanged',path='/org/freedesktop/DBus',"
     41     "sender='org.freedesktop.DBus',arg0='%s'";
     42 
     43 // The class is used for watching the file descriptor used for D-Bus
     44 // communication.
     45 class Watch : public base::MessagePumpLibevent::Watcher {
     46  public:
     47   explicit Watch(DBusWatch* watch)
     48       : raw_watch_(watch), file_descriptor_watcher_(FROM_HERE) {
     49     dbus_watch_set_data(raw_watch_, this, NULL);
     50   }
     51 
     52   ~Watch() override { dbus_watch_set_data(raw_watch_, NULL, NULL); }
     53 
     54   // Returns true if the underlying file descriptor is ready to be watched.
     55   bool IsReadyToBeWatched() {
     56     return dbus_watch_get_enabled(raw_watch_);
     57   }
     58 
     59   // Starts watching the underlying file descriptor.
     60   void StartWatching() {
     61     const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
     62     const int flags = dbus_watch_get_flags(raw_watch_);
     63 
     64     base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ;
     65     if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
     66       mode = base::MessageLoopForIO::WATCH_READ_WRITE;
     67     else if (flags & DBUS_WATCH_READABLE)
     68       mode = base::MessageLoopForIO::WATCH_READ;
     69     else if (flags & DBUS_WATCH_WRITABLE)
     70       mode = base::MessageLoopForIO::WATCH_WRITE;
     71     else
     72       NOTREACHED();
     73 
     74     const bool persistent = true;  // Watch persistently.
     75     const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor(
     76         file_descriptor, persistent, mode, &file_descriptor_watcher_, this);
     77     CHECK(success) << "Unable to allocate memory";
     78   }
     79 
     80   // Stops watching the underlying file descriptor.
     81   void StopWatching() {
     82     file_descriptor_watcher_.StopWatchingFileDescriptor();
     83   }
     84 
     85  private:
     86   // Implement MessagePumpLibevent::Watcher.
     87   void OnFileCanReadWithoutBlocking(int file_descriptor) override {
     88     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
     89     CHECK(success) << "Unable to allocate memory";
     90   }
     91 
     92   // Implement MessagePumpLibevent::Watcher.
     93   void OnFileCanWriteWithoutBlocking(int file_descriptor) override {
     94     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
     95     CHECK(success) << "Unable to allocate memory";
     96   }
     97 
     98   DBusWatch* raw_watch_;
     99   base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
    100 };
    101 
    102 // The class is used for monitoring the timeout used for D-Bus method
    103 // calls.
    104 //
    105 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
    106 // the object is is alive when HandleTimeout() is called. It's unlikely
    107 // but it may be possible that HandleTimeout() is called after
    108 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
    109 // Bus::OnRemoveTimeout().
    110 class Timeout : public base::RefCountedThreadSafe<Timeout> {
    111  public:
    112   explicit Timeout(DBusTimeout* timeout)
    113       : raw_timeout_(timeout),
    114         monitoring_is_active_(false),
    115         is_completed(false) {
    116     dbus_timeout_set_data(raw_timeout_, this, NULL);
    117     AddRef();  // Balanced on Complete().
    118   }
    119 
    120   // Returns true if the timeout is ready to be monitored.
    121   bool IsReadyToBeMonitored() {
    122     return dbus_timeout_get_enabled(raw_timeout_);
    123   }
    124 
    125   // Starts monitoring the timeout.
    126   void StartMonitoring(Bus* bus) {
    127     bus->GetDBusTaskRunner()->PostDelayedTask(
    128         FROM_HERE,
    129         base::Bind(&Timeout::HandleTimeout, this),
    130         GetInterval());
    131     monitoring_is_active_ = true;
    132   }
    133 
    134   // Stops monitoring the timeout.
    135   void StopMonitoring() {
    136     // We cannot take back the delayed task we posted in
    137     // StartMonitoring(), so we just mark the monitoring is inactive now.
    138     monitoring_is_active_ = false;
    139   }
    140 
    141   // Returns the interval.
    142   base::TimeDelta GetInterval() {
    143     return base::TimeDelta::FromMilliseconds(
    144         dbus_timeout_get_interval(raw_timeout_));
    145   }
    146 
    147   // Cleans up the raw_timeout and marks that timeout is completed.
    148   // See the class comment above for why we are doing this.
    149   void Complete() {
    150     dbus_timeout_set_data(raw_timeout_, NULL, NULL);
    151     is_completed = true;
    152     Release();
    153   }
    154 
    155  private:
    156   friend class base::RefCountedThreadSafe<Timeout>;
    157   ~Timeout() {
    158   }
    159 
    160   // Handles the timeout.
    161   void HandleTimeout() {
    162     // If the timeout is marked completed, we should do nothing. This can
    163     // occur if this function is called after Bus::OnRemoveTimeout().
    164     if (is_completed)
    165       return;
    166     // Skip if monitoring is canceled.
    167     if (!monitoring_is_active_)
    168       return;
    169 
    170     const bool success = dbus_timeout_handle(raw_timeout_);
    171     CHECK(success) << "Unable to allocate memory";
    172   }
    173 
    174   DBusTimeout* raw_timeout_;
    175   bool monitoring_is_active_;
    176   bool is_completed;
    177 };
    178 
    179 }  // namespace
    180 
    181 Bus::Options::Options()
    182   : bus_type(SESSION),
    183     connection_type(PRIVATE) {
    184 }
    185 
    186 Bus::Options::~Options() {
    187 }
    188 
    189 Bus::Bus(const Options& options)
    190     : bus_type_(options.bus_type),
    191       connection_type_(options.connection_type),
    192       dbus_task_runner_(options.dbus_task_runner),
    193       on_shutdown_(base::WaitableEvent::ResetPolicy::AUTOMATIC,
    194                    base::WaitableEvent::InitialState::NOT_SIGNALED),
    195       connection_(NULL),
    196       origin_thread_id_(base::PlatformThread::CurrentId()),
    197       async_operations_set_up_(false),
    198       shutdown_completed_(false),
    199       num_pending_watches_(0),
    200       num_pending_timeouts_(0),
    201       address_(options.address) {
    202   // This is safe to call multiple times.
    203   dbus_threads_init_default();
    204   // The origin message loop is unnecessary if the client uses synchronous
    205   // functions only.
    206   if (base::ThreadTaskRunnerHandle::IsSet())
    207     origin_task_runner_ = base::ThreadTaskRunnerHandle::Get();
    208 }
    209 
    210 Bus::~Bus() {
    211   DCHECK(!connection_);
    212   DCHECK(owned_service_names_.empty());
    213   DCHECK(match_rules_added_.empty());
    214   DCHECK(filter_functions_added_.empty());
    215   DCHECK(registered_object_paths_.empty());
    216   DCHECK_EQ(0, num_pending_watches_);
    217   // TODO(satorux): This check fails occasionally in browser_tests for tests
    218   // that run very quickly. Perhaps something does not have time to clean up.
    219   // Despite the check failing, the tests seem to run fine. crosbug.com/23416
    220   // DCHECK_EQ(0, num_pending_timeouts_);
    221 }
    222 
    223 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
    224                                  const ObjectPath& object_path) {
    225   return GetObjectProxyWithOptions(service_name, object_path,
    226                                    ObjectProxy::DEFAULT_OPTIONS);
    227 }
    228 
    229 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
    230                                             const ObjectPath& object_path,
    231                                             int options) {
    232   AssertOnOriginThread();
    233 
    234   // Check if we already have the requested object proxy.
    235   const ObjectProxyTable::key_type key(service_name + object_path.value(),
    236                                        options);
    237   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
    238   if (iter != object_proxy_table_.end()) {
    239     return iter->second.get();
    240   }
    241 
    242   scoped_refptr<ObjectProxy> object_proxy =
    243       new ObjectProxy(this, service_name, object_path, options);
    244   object_proxy_table_[key] = object_proxy;
    245 
    246   return object_proxy.get();
    247 }
    248 
    249 bool Bus::RemoveObjectProxy(const std::string& service_name,
    250                             const ObjectPath& object_path,
    251                             const base::Closure& callback) {
    252   return RemoveObjectProxyWithOptions(service_name, object_path,
    253                                       ObjectProxy::DEFAULT_OPTIONS,
    254                                       callback);
    255 }
    256 
    257 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
    258                                        const ObjectPath& object_path,
    259                                        int options,
    260                                        const base::Closure& callback) {
    261   AssertOnOriginThread();
    262 
    263   // Check if we have the requested object proxy.
    264   const ObjectProxyTable::key_type key(service_name + object_path.value(),
    265                                        options);
    266   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
    267   if (iter != object_proxy_table_.end()) {
    268     scoped_refptr<ObjectProxy> object_proxy = iter->second;
    269     object_proxy_table_.erase(iter);
    270     // Object is present. Remove it now and Detach on the DBus thread.
    271     GetDBusTaskRunner()->PostTask(
    272         FROM_HERE,
    273         base::Bind(&Bus::RemoveObjectProxyInternal,
    274                    this, object_proxy, callback));
    275     return true;
    276   }
    277   return false;
    278 }
    279 
    280 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy,
    281                                     const base::Closure& callback) {
    282   AssertOnDBusThread();
    283 
    284   object_proxy.get()->Detach();
    285 
    286   GetOriginTaskRunner()->PostTask(FROM_HERE, callback);
    287 }
    288 
    289 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
    290   AssertOnOriginThread();
    291 
    292   // Check if we already have the requested exported object.
    293   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
    294   if (iter != exported_object_table_.end()) {
    295     return iter->second.get();
    296   }
    297 
    298   scoped_refptr<ExportedObject> exported_object =
    299       new ExportedObject(this, object_path);
    300   exported_object_table_[object_path] = exported_object;
    301 
    302   return exported_object.get();
    303 }
    304 
    305 void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
    306   AssertOnOriginThread();
    307 
    308   // Remove the registered object from the table first, to allow a new
    309   // GetExportedObject() call to return a new object, rather than this one.
    310   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
    311   if (iter == exported_object_table_.end())
    312     return;
    313 
    314   scoped_refptr<ExportedObject> exported_object = iter->second;
    315   exported_object_table_.erase(iter);
    316 
    317   // Post the task to perform the final unregistration to the D-Bus thread.
    318   // Since the registration also happens on the D-Bus thread in
    319   // TryRegisterObjectPath(), and the task runner we post to is a
    320   // SequencedTaskRunner, there is a guarantee that this will happen before any
    321   // future registration call.
    322   GetDBusTaskRunner()->PostTask(
    323       FROM_HERE,
    324       base::Bind(&Bus::UnregisterExportedObjectInternal,
    325                  this, exported_object));
    326 }
    327 
    328 void Bus::UnregisterExportedObjectInternal(
    329     scoped_refptr<ExportedObject> exported_object) {
    330   AssertOnDBusThread();
    331 
    332   exported_object->Unregister();
    333 }
    334 
    335 ObjectManager* Bus::GetObjectManager(const std::string& service_name,
    336                                      const ObjectPath& object_path) {
    337   AssertOnOriginThread();
    338 
    339   // Check if we already have the requested object manager.
    340   const ObjectManagerTable::key_type key(service_name + object_path.value());
    341   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
    342   if (iter != object_manager_table_.end()) {
    343     return iter->second.get();
    344   }
    345 
    346   scoped_refptr<ObjectManager> object_manager =
    347       new ObjectManager(this, service_name, object_path);
    348   object_manager_table_[key] = object_manager;
    349 
    350   return object_manager.get();
    351 }
    352 
    353 bool Bus::RemoveObjectManager(const std::string& service_name,
    354                               const ObjectPath& object_path,
    355                               const base::Closure& callback) {
    356   AssertOnOriginThread();
    357   DCHECK(!callback.is_null());
    358 
    359   const ObjectManagerTable::key_type key(service_name + object_path.value());
    360   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
    361   if (iter == object_manager_table_.end())
    362     return false;
    363 
    364   // ObjectManager is present. Remove it now and CleanUp on the DBus thread.
    365   scoped_refptr<ObjectManager> object_manager = iter->second;
    366   object_manager_table_.erase(iter);
    367 
    368   GetDBusTaskRunner()->PostTask(
    369       FROM_HERE,
    370       base::Bind(&Bus::RemoveObjectManagerInternal,
    371                  this, object_manager, callback));
    372 
    373   return true;
    374 }
    375 
    376 void Bus::RemoveObjectManagerInternal(
    377       scoped_refptr<dbus::ObjectManager> object_manager,
    378       const base::Closure& callback) {
    379   AssertOnDBusThread();
    380   DCHECK(object_manager.get());
    381 
    382   object_manager->CleanUp();
    383 
    384   // The ObjectManager has to be deleted on the origin thread since it was
    385   // created there.
    386   GetOriginTaskRunner()->PostTask(
    387       FROM_HERE,
    388       base::Bind(&Bus::RemoveObjectManagerInternalHelper,
    389                  this, object_manager, callback));
    390 }
    391 
    392 void Bus::RemoveObjectManagerInternalHelper(
    393       scoped_refptr<dbus::ObjectManager> object_manager,
    394       const base::Closure& callback) {
    395   AssertOnOriginThread();
    396   DCHECK(object_manager.get());
    397 
    398   // Release the object manager and run the callback.
    399   object_manager = NULL;
    400   callback.Run();
    401 }
    402 
    403 bool Bus::Connect() {
    404   // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
    405   AssertOnDBusThread();
    406 
    407   // Check if it's already initialized.
    408   if (connection_)
    409     return true;
    410 
    411   ScopedDBusError error;
    412   if (bus_type_ == CUSTOM_ADDRESS) {
    413     if (connection_type_ == PRIVATE) {
    414       connection_ = dbus_connection_open_private(address_.c_str(), error.get());
    415     } else {
    416       connection_ = dbus_connection_open(address_.c_str(), error.get());
    417     }
    418   } else {
    419     const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
    420     if (connection_type_ == PRIVATE) {
    421       connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
    422     } else {
    423       connection_ = dbus_bus_get(dbus_bus_type, error.get());
    424     }
    425   }
    426   if (!connection_) {
    427     LOG(ERROR) << "Failed to connect to the bus: "
    428                << (error.is_set() ? error.message() : "");
    429     return false;
    430   }
    431 
    432   if (bus_type_ == CUSTOM_ADDRESS) {
    433     // We should call dbus_bus_register here, otherwise unique name can not be
    434     // acquired. According to dbus specification, it is responsible to call
    435     // org.freedesktop.DBus.Hello method at the beging of bus connection to
    436     // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
    437     // called internally.
    438     if (!dbus_bus_register(connection_, error.get())) {
    439       LOG(ERROR) << "Failed to register the bus component: "
    440                  << (error.is_set() ? error.message() : "");
    441       return false;
    442     }
    443   }
    444   // We shouldn't exit on the disconnected signal.
    445   dbus_connection_set_exit_on_disconnect(connection_, false);
    446 
    447   // Watch Disconnected signal.
    448   AddFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
    449   AddMatch(kDisconnectedMatchRule, error.get());
    450 
    451   return true;
    452 }
    453 
    454 void Bus::ClosePrivateConnection() {
    455   // dbus_connection_close is blocking call.
    456   AssertOnDBusThread();
    457   DCHECK_EQ(PRIVATE, connection_type_)
    458       << "non-private connection should not be closed";
    459   dbus_connection_close(connection_);
    460 }
    461 
    462 void Bus::ShutdownAndBlock() {
    463   AssertOnDBusThread();
    464 
    465   if (shutdown_completed_)
    466     return;  // Already shutdowned, just return.
    467 
    468   // Unregister the exported objects.
    469   for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
    470        iter != exported_object_table_.end(); ++iter) {
    471     iter->second->Unregister();
    472   }
    473 
    474   // Release all service names.
    475   for (std::set<std::string>::iterator iter = owned_service_names_.begin();
    476        iter != owned_service_names_.end();) {
    477     // This is a bit tricky but we should increment the iter here as
    478     // ReleaseOwnership() may remove |service_name| from the set.
    479     const std::string& service_name = *iter++;
    480     ReleaseOwnership(service_name);
    481   }
    482   if (!owned_service_names_.empty()) {
    483     LOG(ERROR) << "Failed to release all service names. # of services left: "
    484                << owned_service_names_.size();
    485   }
    486 
    487   // Detach from the remote objects.
    488   for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
    489        iter != object_proxy_table_.end(); ++iter) {
    490     iter->second->Detach();
    491   }
    492 
    493   // Clean up the object managers.
    494   for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
    495        iter != object_manager_table_.end(); ++iter) {
    496     iter->second->CleanUp();
    497   }
    498 
    499   // Release object proxies and exported objects here. We should do this
    500   // here rather than in the destructor to avoid memory leaks due to
    501   // cyclic references.
    502   object_proxy_table_.clear();
    503   exported_object_table_.clear();
    504 
    505   // Private connection should be closed.
    506   if (connection_) {
    507     // Remove Disconnected watcher.
    508     ScopedDBusError error;
    509     RemoveFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
    510     RemoveMatch(kDisconnectedMatchRule, error.get());
    511 
    512     if (connection_type_ == PRIVATE)
    513       ClosePrivateConnection();
    514     // dbus_connection_close() won't unref.
    515     dbus_connection_unref(connection_);
    516   }
    517 
    518   connection_ = NULL;
    519   shutdown_completed_ = true;
    520 }
    521 
    522 void Bus::ShutdownOnDBusThreadAndBlock() {
    523   AssertOnOriginThread();
    524   DCHECK(dbus_task_runner_.get());
    525 
    526   GetDBusTaskRunner()->PostTask(
    527       FROM_HERE,
    528       base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this));
    529 
    530   // http://crbug.com/125222
    531   base::ThreadRestrictions::ScopedAllowWait allow_wait;
    532 
    533   // Wait until the shutdown is complete on the D-Bus thread.
    534   // The shutdown should not hang, but set timeout just in case.
    535   const int kTimeoutSecs = 3;
    536   const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
    537   const bool signaled = on_shutdown_.TimedWait(timeout);
    538   LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
    539 }
    540 
    541 void Bus::RequestOwnership(const std::string& service_name,
    542                            ServiceOwnershipOptions options,
    543                            OnOwnershipCallback on_ownership_callback) {
    544   AssertOnOriginThread();
    545 
    546   GetDBusTaskRunner()->PostTask(
    547       FROM_HERE,
    548       base::Bind(&Bus::RequestOwnershipInternal,
    549                  this, service_name, options, on_ownership_callback));
    550 }
    551 
    552 void Bus::RequestOwnershipInternal(const std::string& service_name,
    553                                    ServiceOwnershipOptions options,
    554                                    OnOwnershipCallback on_ownership_callback) {
    555   AssertOnDBusThread();
    556 
    557   bool success = Connect();
    558   if (success)
    559     success = RequestOwnershipAndBlock(service_name, options);
    560 
    561   GetOriginTaskRunner()->PostTask(FROM_HERE,
    562                                   base::Bind(on_ownership_callback,
    563                                              service_name,
    564                                              success));
    565 }
    566 
    567 bool Bus::RequestOwnershipAndBlock(const std::string& service_name,
    568                                    ServiceOwnershipOptions options) {
    569   DCHECK(connection_);
    570   // dbus_bus_request_name() is a blocking call.
    571   AssertOnDBusThread();
    572 
    573   // Check if we already own the service name.
    574   if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
    575     return true;
    576   }
    577 
    578   ScopedDBusError error;
    579   const int result = dbus_bus_request_name(connection_,
    580                                            service_name.c_str(),
    581                                            options,
    582                                            error.get());
    583   if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
    584     LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
    585                << (error.is_set() ? error.message() : "");
    586     return false;
    587   }
    588   owned_service_names_.insert(service_name);
    589   return true;
    590 }
    591 
    592 bool Bus::ReleaseOwnership(const std::string& service_name) {
    593   DCHECK(connection_);
    594   // dbus_bus_request_name() is a blocking call.
    595   AssertOnDBusThread();
    596 
    597   // Check if we already own the service name.
    598   std::set<std::string>::iterator found =
    599       owned_service_names_.find(service_name);
    600   if (found == owned_service_names_.end()) {
    601     LOG(ERROR) << service_name << " is not owned by the bus";
    602     return false;
    603   }
    604 
    605   ScopedDBusError error;
    606   const int result = dbus_bus_release_name(connection_, service_name.c_str(),
    607                                            error.get());
    608   if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
    609     owned_service_names_.erase(found);
    610     return true;
    611   } else {
    612     LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
    613                << (error.is_set() ? error.message() : "")
    614                << ", result code: " << result;
    615     return false;
    616   }
    617 }
    618 
    619 bool Bus::SetUpAsyncOperations() {
    620   DCHECK(connection_);
    621   AssertOnDBusThread();
    622 
    623   if (async_operations_set_up_)
    624     return true;
    625 
    626   // Process all the incoming data if any, so that OnDispatchStatus() will
    627   // be called when the incoming data is ready.
    628   ProcessAllIncomingDataIfAny();
    629 
    630   bool success = dbus_connection_set_watch_functions(connection_,
    631                                                      &Bus::OnAddWatchThunk,
    632                                                      &Bus::OnRemoveWatchThunk,
    633                                                      &Bus::OnToggleWatchThunk,
    634                                                      this,
    635                                                      NULL);
    636   CHECK(success) << "Unable to allocate memory";
    637 
    638   success = dbus_connection_set_timeout_functions(connection_,
    639                                                   &Bus::OnAddTimeoutThunk,
    640                                                   &Bus::OnRemoveTimeoutThunk,
    641                                                   &Bus::OnToggleTimeoutThunk,
    642                                                   this,
    643                                                   NULL);
    644   CHECK(success) << "Unable to allocate memory";
    645 
    646   dbus_connection_set_dispatch_status_function(
    647       connection_,
    648       &Bus::OnDispatchStatusChangedThunk,
    649       this,
    650       NULL);
    651 
    652   async_operations_set_up_ = true;
    653 
    654   return true;
    655 }
    656 
    657 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
    658                                         int timeout_ms,
    659                                         DBusError* error) {
    660   DCHECK(connection_);
    661   AssertOnDBusThread();
    662 
    663   return dbus_connection_send_with_reply_and_block(
    664       connection_, request, timeout_ms, error);
    665 }
    666 
    667 void Bus::SendWithReply(DBusMessage* request,
    668                         DBusPendingCall** pending_call,
    669                         int timeout_ms) {
    670   DCHECK(connection_);
    671   AssertOnDBusThread();
    672 
    673   const bool success = dbus_connection_send_with_reply(
    674       connection_, request, pending_call, timeout_ms);
    675   CHECK(success) << "Unable to allocate memory";
    676 }
    677 
    678 void Bus::Send(DBusMessage* request, uint32_t* serial) {
    679   DCHECK(connection_);
    680   AssertOnDBusThread();
    681 
    682   const bool success = dbus_connection_send(connection_, request, serial);
    683   CHECK(success) << "Unable to allocate memory";
    684 }
    685 
    686 void Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
    687                             void* user_data) {
    688   DCHECK(connection_);
    689   AssertOnDBusThread();
    690 
    691   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
    692       std::make_pair(filter_function, user_data);
    693   if (filter_functions_added_.find(filter_data_pair) !=
    694       filter_functions_added_.end()) {
    695     VLOG(1) << "Filter function already exists: " << filter_function
    696             << " with associated data: " << user_data;
    697     return;
    698   }
    699 
    700   const bool success = dbus_connection_add_filter(
    701       connection_, filter_function, user_data, NULL);
    702   CHECK(success) << "Unable to allocate memory";
    703   filter_functions_added_.insert(filter_data_pair);
    704 }
    705 
    706 void Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
    707                                void* user_data) {
    708   DCHECK(connection_);
    709   AssertOnDBusThread();
    710 
    711   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
    712       std::make_pair(filter_function, user_data);
    713   if (filter_functions_added_.find(filter_data_pair) ==
    714       filter_functions_added_.end()) {
    715     VLOG(1) << "Requested to remove an unknown filter function: "
    716             << filter_function
    717             << " with associated data: " << user_data;
    718     return;
    719   }
    720 
    721   dbus_connection_remove_filter(connection_, filter_function, user_data);
    722   filter_functions_added_.erase(filter_data_pair);
    723 }
    724 
    725 void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
    726   DCHECK(connection_);
    727   AssertOnDBusThread();
    728 
    729   std::map<std::string, int>::iterator iter =
    730       match_rules_added_.find(match_rule);
    731   if (iter != match_rules_added_.end()) {
    732     // The already existing rule's counter is incremented.
    733     iter->second++;
    734 
    735     VLOG(1) << "Match rule already exists: " << match_rule;
    736     return;
    737   }
    738 
    739   dbus_bus_add_match(connection_, match_rule.c_str(), error);
    740   match_rules_added_[match_rule] = 1;
    741 }
    742 
    743 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
    744   DCHECK(connection_);
    745   AssertOnDBusThread();
    746 
    747   std::map<std::string, int>::iterator iter =
    748       match_rules_added_.find(match_rule);
    749   if (iter == match_rules_added_.end()) {
    750     LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
    751     return false;
    752   }
    753 
    754   // The rule's counter is decremented and the rule is deleted when reachs 0.
    755   iter->second--;
    756   if (iter->second == 0) {
    757     dbus_bus_remove_match(connection_, match_rule.c_str(), error);
    758     match_rules_added_.erase(match_rule);
    759   }
    760   return true;
    761 }
    762 
    763 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
    764                                 const DBusObjectPathVTable* vtable,
    765                                 void* user_data,
    766                                 DBusError* error) {
    767   DCHECK(connection_);
    768   AssertOnDBusThread();
    769 
    770   if (registered_object_paths_.find(object_path) !=
    771       registered_object_paths_.end()) {
    772     LOG(ERROR) << "Object path already registered: " << object_path.value();
    773     return false;
    774   }
    775 
    776   const bool success = dbus_connection_try_register_object_path(
    777       connection_,
    778       object_path.value().c_str(),
    779       vtable,
    780       user_data,
    781       error);
    782   if (success)
    783     registered_object_paths_.insert(object_path);
    784   return success;
    785 }
    786 
    787 void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
    788   DCHECK(connection_);
    789   AssertOnDBusThread();
    790 
    791   if (registered_object_paths_.find(object_path) ==
    792       registered_object_paths_.end()) {
    793     LOG(ERROR) << "Requested to unregister an unknown object path: "
    794                << object_path.value();
    795     return;
    796   }
    797 
    798   const bool success = dbus_connection_unregister_object_path(
    799       connection_,
    800       object_path.value().c_str());
    801   CHECK(success) << "Unable to allocate memory";
    802   registered_object_paths_.erase(object_path);
    803 }
    804 
    805 void Bus::ShutdownOnDBusThreadAndBlockInternal() {
    806   AssertOnDBusThread();
    807 
    808   ShutdownAndBlock();
    809   on_shutdown_.Signal();
    810 }
    811 
    812 void Bus::ProcessAllIncomingDataIfAny() {
    813   AssertOnDBusThread();
    814 
    815   // As mentioned at the class comment in .h file, connection_ can be NULL.
    816   if (!connection_)
    817     return;
    818 
    819   // It is safe and necessary to call dbus_connection_get_dispatch_status even
    820   // if the connection is lost.
    821   if (dbus_connection_get_dispatch_status(connection_) ==
    822       DBUS_DISPATCH_DATA_REMAINS) {
    823     while (dbus_connection_dispatch(connection_) ==
    824            DBUS_DISPATCH_DATA_REMAINS) {
    825     }
    826   }
    827 }
    828 
    829 base::TaskRunner* Bus::GetDBusTaskRunner() {
    830   if (dbus_task_runner_.get())
    831     return dbus_task_runner_.get();
    832   else
    833     return GetOriginTaskRunner();
    834 }
    835 
    836 base::TaskRunner* Bus::GetOriginTaskRunner() {
    837   DCHECK(origin_task_runner_.get());
    838   return origin_task_runner_.get();
    839 }
    840 
    841 bool Bus::HasDBusThread() {
    842   return dbus_task_runner_.get() != NULL;
    843 }
    844 
    845 void Bus::AssertOnOriginThread() {
    846   DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
    847 }
    848 
    849 void Bus::AssertOnDBusThread() {
    850   base::ThreadRestrictions::AssertIOAllowed();
    851 
    852   if (dbus_task_runner_.get()) {
    853     DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread());
    854   } else {
    855     AssertOnOriginThread();
    856   }
    857 }
    858 
    859 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name,
    860                                          GetServiceOwnerOption options) {
    861   AssertOnDBusThread();
    862 
    863   MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner");
    864   MessageWriter writer(&get_name_owner_call);
    865   writer.AppendString(service_name);
    866   VLOG(1) << "Method call: " << get_name_owner_call.ToString();
    867 
    868   const ObjectPath obj_path("/org/freedesktop/DBus");
    869   if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") ||
    870       !get_name_owner_call.SetPath(obj_path)) {
    871     if (options == REPORT_ERRORS)
    872       LOG(ERROR) << "Failed to get name owner.";
    873     return "";
    874   }
    875 
    876   ScopedDBusError error;
    877   DBusMessage* response_message =
    878       SendWithReplyAndBlock(get_name_owner_call.raw_message(),
    879                             ObjectProxy::TIMEOUT_USE_DEFAULT,
    880                             error.get());
    881   if (!response_message) {
    882     if (options == REPORT_ERRORS) {
    883       LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": "
    884                  << error.message();
    885     }
    886     return "";
    887   }
    888 
    889   std::unique_ptr<Response> response(
    890       Response::FromRawMessage(response_message));
    891   MessageReader reader(response.get());
    892 
    893   std::string service_owner;
    894   if (!reader.PopString(&service_owner))
    895     service_owner.clear();
    896   return service_owner;
    897 }
    898 
    899 void Bus::GetServiceOwner(const std::string& service_name,
    900                           const GetServiceOwnerCallback& callback) {
    901   AssertOnOriginThread();
    902 
    903   GetDBusTaskRunner()->PostTask(
    904       FROM_HERE,
    905       base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback));
    906 }
    907 
    908 void Bus::GetServiceOwnerInternal(const std::string& service_name,
    909                                   const GetServiceOwnerCallback& callback) {
    910   AssertOnDBusThread();
    911 
    912   std::string service_owner;
    913   if (Connect())
    914     service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS);
    915   GetOriginTaskRunner()->PostTask(FROM_HERE,
    916                                   base::Bind(callback, service_owner));
    917 }
    918 
    919 void Bus::ListenForServiceOwnerChange(
    920     const std::string& service_name,
    921     const GetServiceOwnerCallback& callback) {
    922   AssertOnOriginThread();
    923   DCHECK(!service_name.empty());
    924   DCHECK(!callback.is_null());
    925 
    926   GetDBusTaskRunner()->PostTask(
    927       FROM_HERE,
    928       base::Bind(&Bus::ListenForServiceOwnerChangeInternal,
    929                  this, service_name, callback));
    930 }
    931 
    932 void Bus::ListenForServiceOwnerChangeInternal(
    933     const std::string& service_name,
    934     const GetServiceOwnerCallback& callback) {
    935   AssertOnDBusThread();
    936   DCHECK(!service_name.empty());
    937   DCHECK(!callback.is_null());
    938 
    939   if (!Connect() || !SetUpAsyncOperations())
    940     return;
    941 
    942   if (service_owner_changed_listener_map_.empty())
    943     AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
    944 
    945   ServiceOwnerChangedListenerMap::iterator it =
    946       service_owner_changed_listener_map_.find(service_name);
    947   if (it == service_owner_changed_listener_map_.end()) {
    948     // Add a match rule for the new service name.
    949     const std::string name_owner_changed_match_rule =
    950         base::StringPrintf(kServiceNameOwnerChangeMatchRule,
    951                            service_name.c_str());
    952     ScopedDBusError error;
    953     AddMatch(name_owner_changed_match_rule, error.get());
    954     if (error.is_set()) {
    955       LOG(ERROR) << "Failed to add match rule for " << service_name
    956                  << ". Got " << error.name() << ": " << error.message();
    957       return;
    958     }
    959 
    960     service_owner_changed_listener_map_[service_name].push_back(callback);
    961     return;
    962   }
    963 
    964   // Check if the callback has already been added.
    965   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
    966   for (size_t i = 0; i < callbacks.size(); ++i) {
    967     if (callbacks[i].Equals(callback))
    968       return;
    969   }
    970   callbacks.push_back(callback);
    971 }
    972 
    973 void Bus::UnlistenForServiceOwnerChange(
    974     const std::string& service_name,
    975     const GetServiceOwnerCallback& callback) {
    976   AssertOnOriginThread();
    977   DCHECK(!service_name.empty());
    978   DCHECK(!callback.is_null());
    979 
    980   GetDBusTaskRunner()->PostTask(
    981       FROM_HERE,
    982       base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal,
    983                  this, service_name, callback));
    984 }
    985 
    986 void Bus::UnlistenForServiceOwnerChangeInternal(
    987     const std::string& service_name,
    988     const GetServiceOwnerCallback& callback) {
    989   AssertOnDBusThread();
    990   DCHECK(!service_name.empty());
    991   DCHECK(!callback.is_null());
    992 
    993   ServiceOwnerChangedListenerMap::iterator it =
    994       service_owner_changed_listener_map_.find(service_name);
    995   if (it == service_owner_changed_listener_map_.end())
    996     return;
    997 
    998   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
    999   for (size_t i = 0; i < callbacks.size(); ++i) {
   1000     if (callbacks[i].Equals(callback)) {
   1001       callbacks.erase(callbacks.begin() + i);
   1002       break;  // There can be only one.
   1003     }
   1004   }
   1005   if (!callbacks.empty())
   1006     return;
   1007 
   1008   // Last callback for |service_name| has been removed, remove match rule.
   1009   const std::string name_owner_changed_match_rule =
   1010       base::StringPrintf(kServiceNameOwnerChangeMatchRule,
   1011                          service_name.c_str());
   1012   ScopedDBusError error;
   1013   RemoveMatch(name_owner_changed_match_rule, error.get());
   1014   // And remove |service_owner_changed_listener_map_| entry.
   1015   service_owner_changed_listener_map_.erase(it);
   1016 
   1017   if (service_owner_changed_listener_map_.empty())
   1018     RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
   1019 }
   1020 
   1021 std::string Bus::GetConnectionName() {
   1022   if (!connection_)
   1023     return "";
   1024   return dbus_bus_get_unique_name(connection_);
   1025 }
   1026 
   1027 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
   1028   AssertOnDBusThread();
   1029 
   1030   // watch will be deleted when raw_watch is removed in OnRemoveWatch().
   1031   Watch* watch = new Watch(raw_watch);
   1032   if (watch->IsReadyToBeWatched()) {
   1033     watch->StartWatching();
   1034   }
   1035   ++num_pending_watches_;
   1036   return true;
   1037 }
   1038 
   1039 void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
   1040   AssertOnDBusThread();
   1041 
   1042   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
   1043   delete watch;
   1044   --num_pending_watches_;
   1045 }
   1046 
   1047 void Bus::OnToggleWatch(DBusWatch* raw_watch) {
   1048   AssertOnDBusThread();
   1049 
   1050   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
   1051   if (watch->IsReadyToBeWatched()) {
   1052     watch->StartWatching();
   1053   } else {
   1054     // It's safe to call this if StartWatching() wasn't called, per
   1055     // message_pump_libevent.h.
   1056     watch->StopWatching();
   1057   }
   1058 }
   1059 
   1060 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
   1061   AssertOnDBusThread();
   1062 
   1063   // timeout will be deleted when raw_timeout is removed in
   1064   // OnRemoveTimeoutThunk().
   1065   Timeout* timeout = new Timeout(raw_timeout);
   1066   if (timeout->IsReadyToBeMonitored()) {
   1067     timeout->StartMonitoring(this);
   1068   }
   1069   ++num_pending_timeouts_;
   1070   return true;
   1071 }
   1072 
   1073 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
   1074   AssertOnDBusThread();
   1075 
   1076   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
   1077   timeout->Complete();
   1078   --num_pending_timeouts_;
   1079 }
   1080 
   1081 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
   1082   AssertOnDBusThread();
   1083 
   1084   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
   1085   if (timeout->IsReadyToBeMonitored()) {
   1086     timeout->StartMonitoring(this);
   1087   } else {
   1088     timeout->StopMonitoring();
   1089   }
   1090 }
   1091 
   1092 void Bus::OnDispatchStatusChanged(DBusConnection* connection,
   1093                                   DBusDispatchStatus status) {
   1094   DCHECK_EQ(connection, connection_);
   1095   AssertOnDBusThread();
   1096 
   1097   // We cannot call ProcessAllIncomingDataIfAny() here, as calling
   1098   // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
   1099   // prohibited by the D-Bus library. Hence, we post a task here instead.
   1100   // See comments for dbus_connection_set_dispatch_status_function().
   1101   GetDBusTaskRunner()->PostTask(FROM_HERE,
   1102                                 base::Bind(&Bus::ProcessAllIncomingDataIfAny,
   1103                                            this));
   1104 }
   1105 
   1106 void Bus::OnServiceOwnerChanged(DBusMessage* message) {
   1107   DCHECK(message);
   1108   AssertOnDBusThread();
   1109 
   1110   // |message| will be unrefed on exit of the function. Increment the
   1111   // reference so we can use it in Signal::FromRawMessage() below.
   1112   dbus_message_ref(message);
   1113   std::unique_ptr<Signal> signal(Signal::FromRawMessage(message));
   1114 
   1115   // Confirm the validity of the NameOwnerChanged signal.
   1116   if (signal->GetMember() != kNameOwnerChangedSignal ||
   1117       signal->GetInterface() != DBUS_INTERFACE_DBUS ||
   1118       signal->GetSender() != DBUS_SERVICE_DBUS) {
   1119     return;
   1120   }
   1121 
   1122   MessageReader reader(signal.get());
   1123   std::string service_name;
   1124   std::string old_owner;
   1125   std::string new_owner;
   1126   if (!reader.PopString(&service_name) ||
   1127       !reader.PopString(&old_owner) ||
   1128       !reader.PopString(&new_owner)) {
   1129     return;
   1130   }
   1131 
   1132   ServiceOwnerChangedListenerMap::const_iterator it =
   1133       service_owner_changed_listener_map_.find(service_name);
   1134   if (it == service_owner_changed_listener_map_.end())
   1135     return;
   1136 
   1137   const std::vector<GetServiceOwnerCallback>& callbacks = it->second;
   1138   for (size_t i = 0; i < callbacks.size(); ++i) {
   1139     GetOriginTaskRunner()->PostTask(FROM_HERE,
   1140                                     base::Bind(callbacks[i], new_owner));
   1141   }
   1142 }
   1143 
   1144 // static
   1145 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
   1146   Bus* self = static_cast<Bus*>(data);
   1147   return self->OnAddWatch(raw_watch);
   1148 }
   1149 
   1150 // static
   1151 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
   1152   Bus* self = static_cast<Bus*>(data);
   1153   self->OnRemoveWatch(raw_watch);
   1154 }
   1155 
   1156 // static
   1157 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
   1158   Bus* self = static_cast<Bus*>(data);
   1159   self->OnToggleWatch(raw_watch);
   1160 }
   1161 
   1162 // static
   1163 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1164   Bus* self = static_cast<Bus*>(data);
   1165   return self->OnAddTimeout(raw_timeout);
   1166 }
   1167 
   1168 // static
   1169 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1170   Bus* self = static_cast<Bus*>(data);
   1171   self->OnRemoveTimeout(raw_timeout);
   1172 }
   1173 
   1174 // static
   1175 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1176   Bus* self = static_cast<Bus*>(data);
   1177   self->OnToggleTimeout(raw_timeout);
   1178 }
   1179 
   1180 // static
   1181 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
   1182                                        DBusDispatchStatus status,
   1183                                        void* data) {
   1184   Bus* self = static_cast<Bus*>(data);
   1185   self->OnDispatchStatusChanged(connection, status);
   1186 }
   1187 
   1188 // static
   1189 DBusHandlerResult Bus::OnConnectionDisconnectedFilter(
   1190     DBusConnection* connection,
   1191     DBusMessage* message,
   1192     void* data) {
   1193   if (dbus_message_is_signal(message,
   1194                              DBUS_INTERFACE_LOCAL,
   1195                              kDisconnectedSignal)) {
   1196     // Abort when the connection is lost.
   1197     LOG(FATAL) << "D-Bus connection was disconnected. Aborting.";
   1198   }
   1199   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
   1200 }
   1201 
   1202 // static
   1203 DBusHandlerResult Bus::OnServiceOwnerChangedFilter(
   1204     DBusConnection* connection,
   1205     DBusMessage* message,
   1206     void* data) {
   1207   if (dbus_message_is_signal(message,
   1208                              DBUS_INTERFACE_DBUS,
   1209                              kNameOwnerChangedSignal)) {
   1210     Bus* self = static_cast<Bus*>(data);
   1211     self->OnServiceOwnerChanged(message);
   1212   }
   1213   // Always return unhandled to let others, e.g. ObjectProxies, handle the same
   1214   // signal.
   1215   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
   1216 }
   1217 
   1218 }  // namespace dbus
   1219