Home | History | Annotate | Download | only in dbus
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "dbus/bus.h"
      6 
      7 #include <stddef.h>
      8 
      9 #include <memory>
     10 
     11 #include "base/bind.h"
     12 #include "base/files/file_descriptor_watcher_posix.h"
     13 #include "base/logging.h"
     14 #include "base/memory/weak_ptr.h"
     15 #include "base/stl_util.h"
     16 #include "base/strings/stringprintf.h"
     17 #include "base/threading/thread.h"
     18 #include "base/threading/thread_restrictions.h"
     19 #include "base/threading/thread_task_runner_handle.h"
     20 #include "base/time/time.h"
     21 #include "dbus/exported_object.h"
     22 #include "dbus/message.h"
     23 #include "dbus/object_manager.h"
     24 #include "dbus/object_path.h"
     25 #include "dbus/object_proxy.h"
     26 #include "dbus/scoped_dbus_error.h"
     27 
     28 namespace dbus {
     29 
     30 namespace {
     31 
     32 const char kDisconnectedSignal[] = "Disconnected";
     33 const char kDisconnectedMatchRule[] =
     34     "type='signal', path='/org/freedesktop/DBus/Local',"
     35     "interface='org.freedesktop.DBus.Local', member='Disconnected'";
     36 
     37 // The NameOwnerChanged member in org.freedesktop.DBus
     38 const char kNameOwnerChangedSignal[] = "NameOwnerChanged";
     39 
     40 // The match rule used to filter for changes to a given service name owner.
     41 const char kServiceNameOwnerChangeMatchRule[] =
     42     "type='signal',interface='org.freedesktop.DBus',"
     43     "member='NameOwnerChanged',path='/org/freedesktop/DBus',"
     44     "sender='org.freedesktop.DBus',arg0='%s'";
     45 
     46 // The class is used for watching the file descriptor used for D-Bus
     47 // communication.
     48 class Watch {
     49  public:
     50   explicit Watch(DBusWatch* watch) : raw_watch_(watch) {
     51     dbus_watch_set_data(raw_watch_, this, nullptr);
     52   }
     53 
     54   ~Watch() { dbus_watch_set_data(raw_watch_, nullptr, nullptr); }
     55 
     56   // Returns true if the underlying file descriptor is ready to be watched.
     57   bool IsReadyToBeWatched() {
     58     return dbus_watch_get_enabled(raw_watch_);
     59   }
     60 
     61   // Starts watching the underlying file descriptor.
     62   void StartWatching() {
     63     const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
     64     const unsigned int flags = dbus_watch_get_flags(raw_watch_);
     65 
     66     // Using base::Unretained(this) is safe because watches are automatically
     67     // canceled when |read_watcher_| and |write_watcher_| are destroyed.
     68     if (flags & DBUS_WATCH_READABLE) {
     69       read_watcher_ = base::FileDescriptorWatcher::WatchReadable(
     70           file_descriptor,
     71           base::Bind(&Watch::OnFileReady, base::Unretained(this),
     72                      DBUS_WATCH_READABLE));
     73     }
     74     if (flags & DBUS_WATCH_WRITABLE) {
     75       write_watcher_ = base::FileDescriptorWatcher::WatchWritable(
     76           file_descriptor,
     77           base::Bind(&Watch::OnFileReady, base::Unretained(this),
     78                      DBUS_WATCH_WRITABLE));
     79     }
     80   }
     81 
     82   // Stops watching the underlying file descriptor.
     83   void StopWatching() {
     84     read_watcher_.reset();
     85     write_watcher_.reset();
     86   }
     87 
     88  private:
     89   void OnFileReady(unsigned int flags) {
     90     CHECK(dbus_watch_handle(raw_watch_, flags)) << "Unable to allocate memory";
     91   }
     92 
     93   DBusWatch* raw_watch_;
     94   std::unique_ptr<base::FileDescriptorWatcher::Controller> read_watcher_;
     95   std::unique_ptr<base::FileDescriptorWatcher::Controller> write_watcher_;
     96 
     97   DISALLOW_COPY_AND_ASSIGN(Watch);
     98 };
     99 
    100 // The class is used for monitoring the timeout used for D-Bus method
    101 // calls.
    102 class Timeout {
    103  public:
    104   explicit Timeout(DBusTimeout* timeout)
    105       : raw_timeout_(timeout), weak_ptr_factory_(this) {
    106     // Associated |this| with the underlying DBusTimeout.
    107     dbus_timeout_set_data(raw_timeout_, this, nullptr);
    108   }
    109 
    110   ~Timeout() {
    111     // Remove the association between |this| and the |raw_timeout_|.
    112     dbus_timeout_set_data(raw_timeout_, nullptr, nullptr);
    113   }
    114 
    115   // Returns true if the timeout is ready to be monitored.
    116   bool IsReadyToBeMonitored() {
    117     return dbus_timeout_get_enabled(raw_timeout_);
    118   }
    119 
    120   // Starts monitoring the timeout.
    121   void StartMonitoring(Bus* bus) {
    122     bus->GetDBusTaskRunner()->PostDelayedTask(
    123         FROM_HERE,
    124         base::Bind(&Timeout::HandleTimeout, weak_ptr_factory_.GetWeakPtr()),
    125         GetInterval());
    126   }
    127 
    128   // Stops monitoring the timeout.
    129   void StopMonitoring() { weak_ptr_factory_.InvalidateWeakPtrs(); }
    130 
    131   base::TimeDelta GetInterval() {
    132     return base::TimeDelta::FromMilliseconds(
    133         dbus_timeout_get_interval(raw_timeout_));
    134   }
    135 
    136  private:
    137   // Calls DBus to handle the timeout.
    138   void HandleTimeout() { CHECK(dbus_timeout_handle(raw_timeout_)); }
    139 
    140   DBusTimeout* raw_timeout_;
    141 
    142   base::WeakPtrFactory<Timeout> weak_ptr_factory_;
    143 
    144   DISALLOW_COPY_AND_ASSIGN(Timeout);
    145 };
    146 
    147 }  // namespace
    148 
    149 Bus::Options::Options()
    150   : bus_type(SESSION),
    151     connection_type(PRIVATE) {
    152 }
    153 
    154 Bus::Options::~Options() = default;
    155 
    156 Bus::Bus(const Options& options)
    157     : bus_type_(options.bus_type),
    158       connection_type_(options.connection_type),
    159       dbus_task_runner_(options.dbus_task_runner),
    160       on_shutdown_(base::WaitableEvent::ResetPolicy::AUTOMATIC,
    161                    base::WaitableEvent::InitialState::NOT_SIGNALED),
    162       connection_(nullptr),
    163       origin_thread_id_(base::PlatformThread::CurrentId()),
    164       async_operations_set_up_(false),
    165       shutdown_completed_(false),
    166       num_pending_watches_(0),
    167       num_pending_timeouts_(0),
    168       address_(options.address) {
    169   // This is safe to call multiple times.
    170   dbus_threads_init_default();
    171   // The origin message loop is unnecessary if the client uses synchronous
    172   // functions only.
    173   if (base::ThreadTaskRunnerHandle::IsSet())
    174     origin_task_runner_ = base::ThreadTaskRunnerHandle::Get();
    175 }
    176 
    177 Bus::~Bus() {
    178   DCHECK(!connection_);
    179   DCHECK(owned_service_names_.empty());
    180   DCHECK(match_rules_added_.empty());
    181   DCHECK(filter_functions_added_.empty());
    182   DCHECK(registered_object_paths_.empty());
    183   DCHECK_EQ(0, num_pending_watches_);
    184   // TODO(satorux): This check fails occasionally in browser_tests for tests
    185   // that run very quickly. Perhaps something does not have time to clean up.
    186   // Despite the check failing, the tests seem to run fine. crosbug.com/23416
    187   // DCHECK_EQ(0, num_pending_timeouts_);
    188 }
    189 
    190 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
    191                                  const ObjectPath& object_path) {
    192   return GetObjectProxyWithOptions(service_name, object_path,
    193                                    ObjectProxy::DEFAULT_OPTIONS);
    194 }
    195 
    196 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
    197                                             const ObjectPath& object_path,
    198                                             int options) {
    199   AssertOnOriginThread();
    200 
    201   // Check if we already have the requested object proxy.
    202   const ObjectProxyTable::key_type key(service_name + object_path.value(),
    203                                        options);
    204   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
    205   if (iter != object_proxy_table_.end()) {
    206     return iter->second.get();
    207   }
    208 
    209   scoped_refptr<ObjectProxy> object_proxy =
    210       new ObjectProxy(this, service_name, object_path, options);
    211   object_proxy_table_[key] = object_proxy;
    212 
    213   return object_proxy.get();
    214 }
    215 
    216 bool Bus::RemoveObjectProxy(const std::string& service_name,
    217                             const ObjectPath& object_path,
    218                             const base::Closure& callback) {
    219   return RemoveObjectProxyWithOptions(service_name, object_path,
    220                                       ObjectProxy::DEFAULT_OPTIONS,
    221                                       callback);
    222 }
    223 
    224 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
    225                                        const ObjectPath& object_path,
    226                                        int options,
    227                                        const base::Closure& callback) {
    228   AssertOnOriginThread();
    229 
    230   // Check if we have the requested object proxy.
    231   const ObjectProxyTable::key_type key(service_name + object_path.value(),
    232                                        options);
    233   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
    234   if (iter != object_proxy_table_.end()) {
    235     scoped_refptr<ObjectProxy> object_proxy = iter->second;
    236     object_proxy_table_.erase(iter);
    237     // Object is present. Remove it now and Detach on the DBus thread.
    238     GetDBusTaskRunner()->PostTask(
    239         FROM_HERE,
    240         base::Bind(&Bus::RemoveObjectProxyInternal,
    241                    this, object_proxy, callback));
    242     return true;
    243   }
    244   return false;
    245 }
    246 
    247 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy,
    248                                     const base::Closure& callback) {
    249   AssertOnDBusThread();
    250 
    251   object_proxy->Detach();
    252 
    253   GetOriginTaskRunner()->PostTask(FROM_HERE, callback);
    254 }
    255 
    256 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
    257   AssertOnOriginThread();
    258 
    259   // Check if we already have the requested exported object.
    260   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
    261   if (iter != exported_object_table_.end()) {
    262     return iter->second.get();
    263   }
    264 
    265   scoped_refptr<ExportedObject> exported_object =
    266       new ExportedObject(this, object_path);
    267   exported_object_table_[object_path] = exported_object;
    268 
    269   return exported_object.get();
    270 }
    271 
    272 void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
    273   AssertOnOriginThread();
    274 
    275   // Remove the registered object from the table first, to allow a new
    276   // GetExportedObject() call to return a new object, rather than this one.
    277   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
    278   if (iter == exported_object_table_.end())
    279     return;
    280 
    281   scoped_refptr<ExportedObject> exported_object = iter->second;
    282   exported_object_table_.erase(iter);
    283 
    284   // Post the task to perform the final unregistration to the D-Bus thread.
    285   // Since the registration also happens on the D-Bus thread in
    286   // TryRegisterObjectPath(), and the task runner we post to is a
    287   // SequencedTaskRunner, there is a guarantee that this will happen before any
    288   // future registration call.
    289   GetDBusTaskRunner()->PostTask(
    290       FROM_HERE,
    291       base::Bind(&Bus::UnregisterExportedObjectInternal,
    292                  this, exported_object));
    293 }
    294 
    295 void Bus::UnregisterExportedObjectInternal(
    296     scoped_refptr<ExportedObject> exported_object) {
    297   AssertOnDBusThread();
    298 
    299   exported_object->Unregister();
    300 }
    301 
    302 ObjectManager* Bus::GetObjectManager(const std::string& service_name,
    303                                      const ObjectPath& object_path) {
    304   AssertOnOriginThread();
    305 
    306   // Check if we already have the requested object manager.
    307   const ObjectManagerTable::key_type key(service_name + object_path.value());
    308   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
    309   if (iter != object_manager_table_.end()) {
    310     return iter->second.get();
    311   }
    312 
    313   scoped_refptr<ObjectManager> object_manager =
    314       new ObjectManager(this, service_name, object_path);
    315   object_manager_table_[key] = object_manager;
    316 
    317   return object_manager.get();
    318 }
    319 
    320 bool Bus::RemoveObjectManager(const std::string& service_name,
    321                               const ObjectPath& object_path,
    322                               const base::Closure& callback) {
    323   AssertOnOriginThread();
    324   DCHECK(!callback.is_null());
    325 
    326   const ObjectManagerTable::key_type key(service_name + object_path.value());
    327   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
    328   if (iter == object_manager_table_.end())
    329     return false;
    330 
    331   // ObjectManager is present. Remove it now and CleanUp on the DBus thread.
    332   scoped_refptr<ObjectManager> object_manager = iter->second;
    333   object_manager_table_.erase(iter);
    334 
    335   GetDBusTaskRunner()->PostTask(
    336       FROM_HERE,
    337       base::Bind(&Bus::RemoveObjectManagerInternal,
    338                  this, object_manager, callback));
    339 
    340   return true;
    341 }
    342 
    343 void Bus::RemoveObjectManagerInternal(
    344       scoped_refptr<dbus::ObjectManager> object_manager,
    345       const base::Closure& callback) {
    346   AssertOnDBusThread();
    347   DCHECK(object_manager.get());
    348 
    349   object_manager->CleanUp();
    350 
    351   // The ObjectManager has to be deleted on the origin thread since it was
    352   // created there.
    353   GetOriginTaskRunner()->PostTask(
    354       FROM_HERE,
    355       base::Bind(&Bus::RemoveObjectManagerInternalHelper,
    356                  this, object_manager, callback));
    357 }
    358 
    359 void Bus::RemoveObjectManagerInternalHelper(
    360       scoped_refptr<dbus::ObjectManager> object_manager,
    361       const base::Closure& callback) {
    362   AssertOnOriginThread();
    363   DCHECK(object_manager);
    364 
    365   // Release the object manager and run the callback.
    366   object_manager = nullptr;
    367   callback.Run();
    368 }
    369 
    370 bool Bus::Connect() {
    371   // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
    372   AssertOnDBusThread();
    373 
    374   // Check if it's already initialized.
    375   if (connection_)
    376     return true;
    377 
    378   ScopedDBusError error;
    379   if (bus_type_ == CUSTOM_ADDRESS) {
    380     if (connection_type_ == PRIVATE) {
    381       connection_ = dbus_connection_open_private(address_.c_str(), error.get());
    382     } else {
    383       connection_ = dbus_connection_open(address_.c_str(), error.get());
    384     }
    385   } else {
    386     const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
    387     if (connection_type_ == PRIVATE) {
    388       connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
    389     } else {
    390       connection_ = dbus_bus_get(dbus_bus_type, error.get());
    391     }
    392   }
    393   if (!connection_) {
    394     LOG(ERROR) << "Failed to connect to the bus: "
    395                << (error.is_set() ? error.message() : "");
    396     return false;
    397   }
    398 
    399   if (bus_type_ == CUSTOM_ADDRESS) {
    400     // We should call dbus_bus_register here, otherwise unique name can not be
    401     // acquired. According to dbus specification, it is responsible to call
    402     // org.freedesktop.DBus.Hello method at the beging of bus connection to
    403     // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
    404     // called internally.
    405     if (!dbus_bus_register(connection_, error.get())) {
    406       LOG(ERROR) << "Failed to register the bus component: "
    407                  << (error.is_set() ? error.message() : "");
    408       return false;
    409     }
    410   }
    411   // We shouldn't exit on the disconnected signal.
    412   dbus_connection_set_exit_on_disconnect(connection_, false);
    413 
    414   // Watch Disconnected signal.
    415   AddFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
    416   AddMatch(kDisconnectedMatchRule, error.get());
    417 
    418   return true;
    419 }
    420 
    421 void Bus::ClosePrivateConnection() {
    422   // dbus_connection_close is blocking call.
    423   AssertOnDBusThread();
    424   DCHECK_EQ(PRIVATE, connection_type_)
    425       << "non-private connection should not be closed";
    426   dbus_connection_close(connection_);
    427 }
    428 
    429 void Bus::ShutdownAndBlock() {
    430   AssertOnDBusThread();
    431 
    432   if (shutdown_completed_)
    433     return;  // Already shutdowned, just return.
    434 
    435   // Unregister the exported objects.
    436   for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
    437        iter != exported_object_table_.end(); ++iter) {
    438     iter->second->Unregister();
    439   }
    440 
    441   // Release all service names.
    442   for (std::set<std::string>::iterator iter = owned_service_names_.begin();
    443        iter != owned_service_names_.end();) {
    444     // This is a bit tricky but we should increment the iter here as
    445     // ReleaseOwnership() may remove |service_name| from the set.
    446     const std::string& service_name = *iter++;
    447     ReleaseOwnership(service_name);
    448   }
    449   if (!owned_service_names_.empty()) {
    450     LOG(ERROR) << "Failed to release all service names. # of services left: "
    451                << owned_service_names_.size();
    452   }
    453 
    454   // Detach from the remote objects.
    455   for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
    456        iter != object_proxy_table_.end(); ++iter) {
    457     iter->second->Detach();
    458   }
    459 
    460   // Clean up the object managers.
    461   for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
    462        iter != object_manager_table_.end(); ++iter) {
    463     iter->second->CleanUp();
    464   }
    465 
    466   // Release object proxies and exported objects here. We should do this
    467   // here rather than in the destructor to avoid memory leaks due to
    468   // cyclic references.
    469   object_proxy_table_.clear();
    470   exported_object_table_.clear();
    471 
    472   // Private connection should be closed.
    473   if (connection_) {
    474     // Remove Disconnected watcher.
    475     ScopedDBusError error;
    476     RemoveFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
    477     RemoveMatch(kDisconnectedMatchRule, error.get());
    478 
    479     if (connection_type_ == PRIVATE)
    480       ClosePrivateConnection();
    481     // dbus_connection_close() won't unref.
    482     dbus_connection_unref(connection_);
    483   }
    484 
    485   connection_ = nullptr;
    486   shutdown_completed_ = true;
    487 }
    488 
    489 void Bus::ShutdownOnDBusThreadAndBlock() {
    490   AssertOnOriginThread();
    491   DCHECK(dbus_task_runner_);
    492 
    493   GetDBusTaskRunner()->PostTask(
    494       FROM_HERE,
    495       base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this));
    496 
    497   // http://crbug.com/125222
    498   base::ThreadRestrictions::ScopedAllowWait allow_wait;
    499 
    500   // Wait until the shutdown is complete on the D-Bus thread.
    501   // The shutdown should not hang, but set timeout just in case.
    502   const int kTimeoutSecs = 3;
    503   const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
    504   const bool signaled = on_shutdown_.TimedWait(timeout);
    505   LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
    506 }
    507 
    508 void Bus::RequestOwnership(const std::string& service_name,
    509                            ServiceOwnershipOptions options,
    510                            OnOwnershipCallback on_ownership_callback) {
    511   AssertOnOriginThread();
    512 
    513   GetDBusTaskRunner()->PostTask(
    514       FROM_HERE,
    515       base::Bind(&Bus::RequestOwnershipInternal,
    516                  this, service_name, options, on_ownership_callback));
    517 }
    518 
    519 void Bus::RequestOwnershipInternal(const std::string& service_name,
    520                                    ServiceOwnershipOptions options,
    521                                    OnOwnershipCallback on_ownership_callback) {
    522   AssertOnDBusThread();
    523 
    524   bool success = Connect();
    525   if (success)
    526     success = RequestOwnershipAndBlock(service_name, options);
    527 
    528   GetOriginTaskRunner()->PostTask(FROM_HERE,
    529                                   base::Bind(on_ownership_callback,
    530                                              service_name,
    531                                              success));
    532 }
    533 
    534 bool Bus::RequestOwnershipAndBlock(const std::string& service_name,
    535                                    ServiceOwnershipOptions options) {
    536   DCHECK(connection_);
    537   // dbus_bus_request_name() is a blocking call.
    538   AssertOnDBusThread();
    539 
    540   // Check if we already own the service name.
    541   if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
    542     return true;
    543   }
    544 
    545   ScopedDBusError error;
    546   const int result = dbus_bus_request_name(connection_,
    547                                            service_name.c_str(),
    548                                            options,
    549                                            error.get());
    550   if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
    551     LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
    552                << (error.is_set() ? error.message() : "");
    553     return false;
    554   }
    555   owned_service_names_.insert(service_name);
    556   return true;
    557 }
    558 
    559 bool Bus::ReleaseOwnership(const std::string& service_name) {
    560   DCHECK(connection_);
    561   // dbus_bus_request_name() is a blocking call.
    562   AssertOnDBusThread();
    563 
    564   // Check if we already own the service name.
    565   std::set<std::string>::iterator found =
    566       owned_service_names_.find(service_name);
    567   if (found == owned_service_names_.end()) {
    568     LOG(ERROR) << service_name << " is not owned by the bus";
    569     return false;
    570   }
    571 
    572   ScopedDBusError error;
    573   const int result = dbus_bus_release_name(connection_, service_name.c_str(),
    574                                            error.get());
    575   if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
    576     owned_service_names_.erase(found);
    577     return true;
    578   } else {
    579     LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
    580                << (error.is_set() ? error.message() : "")
    581                << ", result code: " << result;
    582     return false;
    583   }
    584 }
    585 
    586 bool Bus::SetUpAsyncOperations() {
    587   DCHECK(connection_);
    588   AssertOnDBusThread();
    589 
    590   if (async_operations_set_up_)
    591     return true;
    592 
    593   // Process all the incoming data if any, so that OnDispatchStatus() will
    594   // be called when the incoming data is ready.
    595   ProcessAllIncomingDataIfAny();
    596 
    597   bool success = dbus_connection_set_watch_functions(
    598       connection_, &Bus::OnAddWatchThunk, &Bus::OnRemoveWatchThunk,
    599       &Bus::OnToggleWatchThunk, this, nullptr);
    600   CHECK(success) << "Unable to allocate memory";
    601 
    602   success = dbus_connection_set_timeout_functions(
    603       connection_, &Bus::OnAddTimeoutThunk, &Bus::OnRemoveTimeoutThunk,
    604       &Bus::OnToggleTimeoutThunk, this, nullptr);
    605   CHECK(success) << "Unable to allocate memory";
    606 
    607   dbus_connection_set_dispatch_status_function(
    608       connection_, &Bus::OnDispatchStatusChangedThunk, this, nullptr);
    609 
    610   async_operations_set_up_ = true;
    611 
    612   return true;
    613 }
    614 
    615 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
    616                                         int timeout_ms,
    617                                         DBusError* error) {
    618   DCHECK(connection_);
    619   AssertOnDBusThread();
    620 
    621   return dbus_connection_send_with_reply_and_block(
    622       connection_, request, timeout_ms, error);
    623 }
    624 
    625 void Bus::SendWithReply(DBusMessage* request,
    626                         DBusPendingCall** pending_call,
    627                         int timeout_ms) {
    628   DCHECK(connection_);
    629   AssertOnDBusThread();
    630 
    631   const bool success = dbus_connection_send_with_reply(
    632       connection_, request, pending_call, timeout_ms);
    633   CHECK(success) << "Unable to allocate memory";
    634 }
    635 
    636 void Bus::Send(DBusMessage* request, uint32_t* serial) {
    637   DCHECK(connection_);
    638   AssertOnDBusThread();
    639 
    640   const bool success = dbus_connection_send(connection_, request, serial);
    641   CHECK(success) << "Unable to allocate memory";
    642 }
    643 
    644 void Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
    645                             void* user_data) {
    646   DCHECK(connection_);
    647   AssertOnDBusThread();
    648 
    649   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
    650       std::make_pair(filter_function, user_data);
    651   if (filter_functions_added_.find(filter_data_pair) !=
    652       filter_functions_added_.end()) {
    653     VLOG(1) << "Filter function already exists: " << filter_function
    654             << " with associated data: " << user_data;
    655     return;
    656   }
    657 
    658   const bool success = dbus_connection_add_filter(connection_, filter_function,
    659                                                   user_data, nullptr);
    660   CHECK(success) << "Unable to allocate memory";
    661   filter_functions_added_.insert(filter_data_pair);
    662 }
    663 
    664 void Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
    665                                void* user_data) {
    666   DCHECK(connection_);
    667   AssertOnDBusThread();
    668 
    669   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
    670       std::make_pair(filter_function, user_data);
    671   if (filter_functions_added_.find(filter_data_pair) ==
    672       filter_functions_added_.end()) {
    673     VLOG(1) << "Requested to remove an unknown filter function: "
    674             << filter_function
    675             << " with associated data: " << user_data;
    676     return;
    677   }
    678 
    679   dbus_connection_remove_filter(connection_, filter_function, user_data);
    680   filter_functions_added_.erase(filter_data_pair);
    681 }
    682 
    683 void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
    684   DCHECK(connection_);
    685   AssertOnDBusThread();
    686 
    687   std::map<std::string, int>::iterator iter =
    688       match_rules_added_.find(match_rule);
    689   if (iter != match_rules_added_.end()) {
    690     // The already existing rule's counter is incremented.
    691     iter->second++;
    692 
    693     VLOG(1) << "Match rule already exists: " << match_rule;
    694     return;
    695   }
    696 
    697   dbus_bus_add_match(connection_, match_rule.c_str(), error);
    698   match_rules_added_[match_rule] = 1;
    699 }
    700 
    701 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
    702   DCHECK(connection_);
    703   AssertOnDBusThread();
    704 
    705   std::map<std::string, int>::iterator iter =
    706       match_rules_added_.find(match_rule);
    707   if (iter == match_rules_added_.end()) {
    708     LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
    709     return false;
    710   }
    711 
    712   // The rule's counter is decremented and the rule is deleted when reachs 0.
    713   iter->second--;
    714   if (iter->second == 0) {
    715     dbus_bus_remove_match(connection_, match_rule.c_str(), error);
    716     match_rules_added_.erase(match_rule);
    717   }
    718   return true;
    719 }
    720 
    721 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
    722                                 const DBusObjectPathVTable* vtable,
    723                                 void* user_data,
    724                                 DBusError* error) {
    725   DCHECK(connection_);
    726   AssertOnDBusThread();
    727 
    728   if (registered_object_paths_.find(object_path) !=
    729       registered_object_paths_.end()) {
    730     LOG(ERROR) << "Object path already registered: " << object_path.value();
    731     return false;
    732   }
    733 
    734   const bool success = dbus_connection_try_register_object_path(
    735       connection_,
    736       object_path.value().c_str(),
    737       vtable,
    738       user_data,
    739       error);
    740   if (success)
    741     registered_object_paths_.insert(object_path);
    742   return success;
    743 }
    744 
    745 void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
    746   DCHECK(connection_);
    747   AssertOnDBusThread();
    748 
    749   if (registered_object_paths_.find(object_path) ==
    750       registered_object_paths_.end()) {
    751     LOG(ERROR) << "Requested to unregister an unknown object path: "
    752                << object_path.value();
    753     return;
    754   }
    755 
    756   const bool success = dbus_connection_unregister_object_path(
    757       connection_,
    758       object_path.value().c_str());
    759   CHECK(success) << "Unable to allocate memory";
    760   registered_object_paths_.erase(object_path);
    761 }
    762 
    763 void Bus::ShutdownOnDBusThreadAndBlockInternal() {
    764   AssertOnDBusThread();
    765 
    766   ShutdownAndBlock();
    767   on_shutdown_.Signal();
    768 }
    769 
    770 void Bus::ProcessAllIncomingDataIfAny() {
    771   AssertOnDBusThread();
    772 
    773   // As mentioned at the class comment in .h file, connection_ can be NULL.
    774   if (!connection_)
    775     return;
    776 
    777   // It is safe and necessary to call dbus_connection_get_dispatch_status even
    778   // if the connection is lost.
    779   if (dbus_connection_get_dispatch_status(connection_) ==
    780       DBUS_DISPATCH_DATA_REMAINS) {
    781     while (dbus_connection_dispatch(connection_) ==
    782            DBUS_DISPATCH_DATA_REMAINS) {
    783     }
    784   }
    785 }
    786 
    787 base::TaskRunner* Bus::GetDBusTaskRunner() {
    788   if (dbus_task_runner_)
    789     return dbus_task_runner_.get();
    790   else
    791     return GetOriginTaskRunner();
    792 }
    793 
    794 base::TaskRunner* Bus::GetOriginTaskRunner() {
    795   DCHECK(origin_task_runner_);
    796   return origin_task_runner_.get();
    797 }
    798 
    799 bool Bus::HasDBusThread() {
    800   return dbus_task_runner_ != nullptr;
    801 }
    802 
    803 void Bus::AssertOnOriginThread() {
    804   DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
    805 }
    806 
    807 void Bus::AssertOnDBusThread() {
    808   base::AssertBlockingAllowed();
    809 
    810   if (dbus_task_runner_) {
    811     DCHECK(dbus_task_runner_->RunsTasksInCurrentSequence());
    812   } else {
    813     AssertOnOriginThread();
    814   }
    815 }
    816 
    817 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name,
    818                                          GetServiceOwnerOption options) {
    819   AssertOnDBusThread();
    820 
    821   MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner");
    822   MessageWriter writer(&get_name_owner_call);
    823   writer.AppendString(service_name);
    824   VLOG(1) << "Method call: " << get_name_owner_call.ToString();
    825 
    826   const ObjectPath obj_path("/org/freedesktop/DBus");
    827   if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") ||
    828       !get_name_owner_call.SetPath(obj_path)) {
    829     if (options == REPORT_ERRORS)
    830       LOG(ERROR) << "Failed to get name owner.";
    831     return "";
    832   }
    833 
    834   ScopedDBusError error;
    835   DBusMessage* response_message =
    836       SendWithReplyAndBlock(get_name_owner_call.raw_message(),
    837                             ObjectProxy::TIMEOUT_USE_DEFAULT,
    838                             error.get());
    839   if (!response_message) {
    840     if (options == REPORT_ERRORS) {
    841       LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": "
    842                  << error.message();
    843     }
    844     return "";
    845   }
    846 
    847   std::unique_ptr<Response> response(
    848       Response::FromRawMessage(response_message));
    849   MessageReader reader(response.get());
    850 
    851   std::string service_owner;
    852   if (!reader.PopString(&service_owner))
    853     service_owner.clear();
    854   return service_owner;
    855 }
    856 
    857 void Bus::GetServiceOwner(const std::string& service_name,
    858                           const GetServiceOwnerCallback& callback) {
    859   AssertOnOriginThread();
    860 
    861   GetDBusTaskRunner()->PostTask(
    862       FROM_HERE,
    863       base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback));
    864 }
    865 
    866 void Bus::GetServiceOwnerInternal(const std::string& service_name,
    867                                   const GetServiceOwnerCallback& callback) {
    868   AssertOnDBusThread();
    869 
    870   std::string service_owner;
    871   if (Connect())
    872     service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS);
    873   GetOriginTaskRunner()->PostTask(FROM_HERE,
    874                                   base::Bind(callback, service_owner));
    875 }
    876 
    877 void Bus::ListenForServiceOwnerChange(
    878     const std::string& service_name,
    879     const GetServiceOwnerCallback& callback) {
    880   AssertOnOriginThread();
    881   DCHECK(!service_name.empty());
    882   DCHECK(!callback.is_null());
    883 
    884   GetDBusTaskRunner()->PostTask(
    885       FROM_HERE,
    886       base::Bind(&Bus::ListenForServiceOwnerChangeInternal,
    887                  this, service_name, callback));
    888 }
    889 
    890 void Bus::ListenForServiceOwnerChangeInternal(
    891     const std::string& service_name,
    892     const GetServiceOwnerCallback& callback) {
    893   AssertOnDBusThread();
    894   DCHECK(!service_name.empty());
    895   DCHECK(!callback.is_null());
    896 
    897   if (!Connect() || !SetUpAsyncOperations())
    898     return;
    899 
    900   if (service_owner_changed_listener_map_.empty())
    901     AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
    902 
    903   ServiceOwnerChangedListenerMap::iterator it =
    904       service_owner_changed_listener_map_.find(service_name);
    905   if (it == service_owner_changed_listener_map_.end()) {
    906     // Add a match rule for the new service name.
    907     const std::string name_owner_changed_match_rule =
    908         base::StringPrintf(kServiceNameOwnerChangeMatchRule,
    909                            service_name.c_str());
    910     ScopedDBusError error;
    911     AddMatch(name_owner_changed_match_rule, error.get());
    912     if (error.is_set()) {
    913       LOG(ERROR) << "Failed to add match rule for " << service_name
    914                  << ". Got " << error.name() << ": " << error.message();
    915       return;
    916     }
    917 
    918     service_owner_changed_listener_map_[service_name].push_back(callback);
    919     return;
    920   }
    921 
    922   // Check if the callback has already been added.
    923   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
    924   for (size_t i = 0; i < callbacks.size(); ++i) {
    925     if (callbacks[i].Equals(callback))
    926       return;
    927   }
    928   callbacks.push_back(callback);
    929 }
    930 
    931 void Bus::UnlistenForServiceOwnerChange(
    932     const std::string& service_name,
    933     const GetServiceOwnerCallback& callback) {
    934   AssertOnOriginThread();
    935   DCHECK(!service_name.empty());
    936   DCHECK(!callback.is_null());
    937 
    938   GetDBusTaskRunner()->PostTask(
    939       FROM_HERE,
    940       base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal,
    941                  this, service_name, callback));
    942 }
    943 
    944 void Bus::UnlistenForServiceOwnerChangeInternal(
    945     const std::string& service_name,
    946     const GetServiceOwnerCallback& callback) {
    947   AssertOnDBusThread();
    948   DCHECK(!service_name.empty());
    949   DCHECK(!callback.is_null());
    950 
    951   ServiceOwnerChangedListenerMap::iterator it =
    952       service_owner_changed_listener_map_.find(service_name);
    953   if (it == service_owner_changed_listener_map_.end())
    954     return;
    955 
    956   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
    957   for (size_t i = 0; i < callbacks.size(); ++i) {
    958     if (callbacks[i].Equals(callback)) {
    959       callbacks.erase(callbacks.begin() + i);
    960       break;  // There can be only one.
    961     }
    962   }
    963   if (!callbacks.empty())
    964     return;
    965 
    966   // Last callback for |service_name| has been removed, remove match rule.
    967   const std::string name_owner_changed_match_rule =
    968       base::StringPrintf(kServiceNameOwnerChangeMatchRule,
    969                          service_name.c_str());
    970   ScopedDBusError error;
    971   RemoveMatch(name_owner_changed_match_rule, error.get());
    972   // And remove |service_owner_changed_listener_map_| entry.
    973   service_owner_changed_listener_map_.erase(it);
    974 
    975   if (service_owner_changed_listener_map_.empty())
    976     RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
    977 }
    978 
    979 std::string Bus::GetConnectionName() {
    980   if (!connection_)
    981     return "";
    982   return dbus_bus_get_unique_name(connection_);
    983 }
    984 
    985 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
    986   AssertOnDBusThread();
    987 
    988   // watch will be deleted when raw_watch is removed in OnRemoveWatch().
    989   Watch* watch = new Watch(raw_watch);
    990   if (watch->IsReadyToBeWatched()) {
    991     watch->StartWatching();
    992   }
    993   ++num_pending_watches_;
    994   return true;
    995 }
    996 
    997 void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
    998   AssertOnDBusThread();
    999 
   1000   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
   1001   delete watch;
   1002   --num_pending_watches_;
   1003 }
   1004 
   1005 void Bus::OnToggleWatch(DBusWatch* raw_watch) {
   1006   AssertOnDBusThread();
   1007 
   1008   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
   1009   if (watch->IsReadyToBeWatched())
   1010     watch->StartWatching();
   1011   else
   1012     watch->StopWatching();
   1013 }
   1014 
   1015 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
   1016   AssertOnDBusThread();
   1017 
   1018   // |timeout| will be deleted by OnRemoveTimeoutThunk().
   1019   Timeout* timeout = new Timeout(raw_timeout);
   1020   if (timeout->IsReadyToBeMonitored()) {
   1021     timeout->StartMonitoring(this);
   1022   }
   1023   ++num_pending_timeouts_;
   1024   return true;
   1025 }
   1026 
   1027 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
   1028   AssertOnDBusThread();
   1029 
   1030   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
   1031   delete timeout;
   1032   --num_pending_timeouts_;
   1033 }
   1034 
   1035 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
   1036   AssertOnDBusThread();
   1037 
   1038   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
   1039   if (timeout->IsReadyToBeMonitored()) {
   1040     timeout->StartMonitoring(this);
   1041   } else {
   1042     timeout->StopMonitoring();
   1043   }
   1044 }
   1045 
   1046 void Bus::OnDispatchStatusChanged(DBusConnection* connection,
   1047                                   DBusDispatchStatus status) {
   1048   DCHECK_EQ(connection, connection_);
   1049   AssertOnDBusThread();
   1050 
   1051   // We cannot call ProcessAllIncomingDataIfAny() here, as calling
   1052   // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
   1053   // prohibited by the D-Bus library. Hence, we post a task here instead.
   1054   // See comments for dbus_connection_set_dispatch_status_function().
   1055   GetDBusTaskRunner()->PostTask(FROM_HERE,
   1056                                 base::Bind(&Bus::ProcessAllIncomingDataIfAny,
   1057                                            this));
   1058 }
   1059 
   1060 void Bus::OnServiceOwnerChanged(DBusMessage* message) {
   1061   DCHECK(message);
   1062   AssertOnDBusThread();
   1063 
   1064   // |message| will be unrefed on exit of the function. Increment the
   1065   // reference so we can use it in Signal::FromRawMessage() below.
   1066   dbus_message_ref(message);
   1067   std::unique_ptr<Signal> signal(Signal::FromRawMessage(message));
   1068 
   1069   // Confirm the validity of the NameOwnerChanged signal.
   1070   if (signal->GetMember() != kNameOwnerChangedSignal ||
   1071       signal->GetInterface() != DBUS_INTERFACE_DBUS ||
   1072       signal->GetSender() != DBUS_SERVICE_DBUS) {
   1073     return;
   1074   }
   1075 
   1076   MessageReader reader(signal.get());
   1077   std::string service_name;
   1078   std::string old_owner;
   1079   std::string new_owner;
   1080   if (!reader.PopString(&service_name) ||
   1081       !reader.PopString(&old_owner) ||
   1082       !reader.PopString(&new_owner)) {
   1083     return;
   1084   }
   1085 
   1086   ServiceOwnerChangedListenerMap::const_iterator it =
   1087       service_owner_changed_listener_map_.find(service_name);
   1088   if (it == service_owner_changed_listener_map_.end())
   1089     return;
   1090 
   1091   const std::vector<GetServiceOwnerCallback>& callbacks = it->second;
   1092   for (size_t i = 0; i < callbacks.size(); ++i) {
   1093     GetOriginTaskRunner()->PostTask(FROM_HERE,
   1094                                     base::Bind(callbacks[i], new_owner));
   1095   }
   1096 }
   1097 
   1098 // static
   1099 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
   1100   Bus* self = static_cast<Bus*>(data);
   1101   return self->OnAddWatch(raw_watch);
   1102 }
   1103 
   1104 // static
   1105 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
   1106   Bus* self = static_cast<Bus*>(data);
   1107   self->OnRemoveWatch(raw_watch);
   1108 }
   1109 
   1110 // static
   1111 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
   1112   Bus* self = static_cast<Bus*>(data);
   1113   self->OnToggleWatch(raw_watch);
   1114 }
   1115 
   1116 // static
   1117 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1118   Bus* self = static_cast<Bus*>(data);
   1119   return self->OnAddTimeout(raw_timeout);
   1120 }
   1121 
   1122 // static
   1123 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1124   Bus* self = static_cast<Bus*>(data);
   1125   self->OnRemoveTimeout(raw_timeout);
   1126 }
   1127 
   1128 // static
   1129 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1130   Bus* self = static_cast<Bus*>(data);
   1131   self->OnToggleTimeout(raw_timeout);
   1132 }
   1133 
   1134 // static
   1135 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
   1136                                        DBusDispatchStatus status,
   1137                                        void* data) {
   1138   Bus* self = static_cast<Bus*>(data);
   1139   self->OnDispatchStatusChanged(connection, status);
   1140 }
   1141 
   1142 // static
   1143 DBusHandlerResult Bus::OnConnectionDisconnectedFilter(
   1144     DBusConnection* connection,
   1145     DBusMessage* message,
   1146     void* data) {
   1147   if (dbus_message_is_signal(message,
   1148                              DBUS_INTERFACE_LOCAL,
   1149                              kDisconnectedSignal)) {
   1150     // Abort when the connection is lost.
   1151     LOG(FATAL) << "D-Bus connection was disconnected. Aborting.";
   1152   }
   1153   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
   1154 }
   1155 
   1156 // static
   1157 DBusHandlerResult Bus::OnServiceOwnerChangedFilter(
   1158     DBusConnection* connection,
   1159     DBusMessage* message,
   1160     void* data) {
   1161   if (dbus_message_is_signal(message,
   1162                              DBUS_INTERFACE_DBUS,
   1163                              kNameOwnerChangedSignal)) {
   1164     Bus* self = static_cast<Bus*>(data);
   1165     self->OnServiceOwnerChanged(message);
   1166   }
   1167   // Always return unhandled to let others, e.g. ObjectProxies, handle the same
   1168   // signal.
   1169   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
   1170 }
   1171 
   1172 }  // namespace dbus
   1173