Home | History | Annotate | Download | only in dbus
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "dbus/bus.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/logging.h"
      9 #include "base/message_loop/message_loop.h"
     10 #include "base/message_loop/message_loop_proxy.h"
     11 #include "base/stl_util.h"
     12 #include "base/strings/stringprintf.h"
     13 #include "base/threading/thread.h"
     14 #include "base/threading/thread_restrictions.h"
     15 #include "base/time/time.h"
     16 #include "dbus/exported_object.h"
     17 #include "dbus/message.h"
     18 #include "dbus/object_manager.h"
     19 #include "dbus/object_path.h"
     20 #include "dbus/object_proxy.h"
     21 #include "dbus/scoped_dbus_error.h"
     22 
     23 namespace dbus {
     24 
     25 namespace {
     26 
     27 const char kDisconnectedSignal[] = "Disconnected";
     28 const char kDisconnectedMatchRule[] =
     29     "type='signal', path='/org/freedesktop/DBus/Local',"
     30     "interface='org.freedesktop.DBus.Local', member='Disconnected'";
     31 
     32 // The NameOwnerChanged member in org.freedesktop.DBus
     33 const char kNameOwnerChangedSignal[] = "NameOwnerChanged";
     34 
     35 // The match rule used to filter for changes to a given service name owner.
     36 const char kServiceNameOwnerChangeMatchRule[] =
     37     "type='signal',interface='org.freedesktop.DBus',"
     38     "member='NameOwnerChanged',path='/org/freedesktop/DBus',"
     39     "sender='org.freedesktop.DBus',arg0='%s'";
     40 
     41 // The class is used for watching the file descriptor used for D-Bus
     42 // communication.
     43 class Watch : public base::MessagePumpLibevent::Watcher {
     44  public:
     45   explicit Watch(DBusWatch* watch)
     46       : raw_watch_(watch) {
     47     dbus_watch_set_data(raw_watch_, this, NULL);
     48   }
     49 
     50   virtual ~Watch() {
     51     dbus_watch_set_data(raw_watch_, NULL, NULL);
     52   }
     53 
     54   // Returns true if the underlying file descriptor is ready to be watched.
     55   bool IsReadyToBeWatched() {
     56     return dbus_watch_get_enabled(raw_watch_);
     57   }
     58 
     59   // Starts watching the underlying file descriptor.
     60   void StartWatching() {
     61     const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
     62     const int flags = dbus_watch_get_flags(raw_watch_);
     63 
     64     base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ;
     65     if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
     66       mode = base::MessageLoopForIO::WATCH_READ_WRITE;
     67     else if (flags & DBUS_WATCH_READABLE)
     68       mode = base::MessageLoopForIO::WATCH_READ;
     69     else if (flags & DBUS_WATCH_WRITABLE)
     70       mode = base::MessageLoopForIO::WATCH_WRITE;
     71     else
     72       NOTREACHED();
     73 
     74     const bool persistent = true;  // Watch persistently.
     75     const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor(
     76         file_descriptor, persistent, mode, &file_descriptor_watcher_, this);
     77     CHECK(success) << "Unable to allocate memory";
     78   }
     79 
     80   // Stops watching the underlying file descriptor.
     81   void StopWatching() {
     82     file_descriptor_watcher_.StopWatchingFileDescriptor();
     83   }
     84 
     85  private:
     86   // Implement MessagePumpLibevent::Watcher.
     87   virtual void OnFileCanReadWithoutBlocking(int file_descriptor) OVERRIDE {
     88     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
     89     CHECK(success) << "Unable to allocate memory";
     90   }
     91 
     92   // Implement MessagePumpLibevent::Watcher.
     93   virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) OVERRIDE {
     94     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
     95     CHECK(success) << "Unable to allocate memory";
     96   }
     97 
     98   DBusWatch* raw_watch_;
     99   base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
    100 };
    101 
    102 // The class is used for monitoring the timeout used for D-Bus method
    103 // calls.
    104 //
    105 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
    106 // the object is is alive when HandleTimeout() is called. It's unlikely
    107 // but it may be possible that HandleTimeout() is called after
    108 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
    109 // Bus::OnRemoveTimeout().
    110 class Timeout : public base::RefCountedThreadSafe<Timeout> {
    111  public:
    112   explicit Timeout(DBusTimeout* timeout)
    113       : raw_timeout_(timeout),
    114         monitoring_is_active_(false),
    115         is_completed(false) {
    116     dbus_timeout_set_data(raw_timeout_, this, NULL);
    117     AddRef();  // Balanced on Complete().
    118   }
    119 
    120   // Returns true if the timeout is ready to be monitored.
    121   bool IsReadyToBeMonitored() {
    122     return dbus_timeout_get_enabled(raw_timeout_);
    123   }
    124 
    125   // Starts monitoring the timeout.
    126   void StartMonitoring(Bus* bus) {
    127     bus->GetDBusTaskRunner()->PostDelayedTask(
    128         FROM_HERE,
    129         base::Bind(&Timeout::HandleTimeout, this),
    130         GetInterval());
    131     monitoring_is_active_ = true;
    132   }
    133 
    134   // Stops monitoring the timeout.
    135   void StopMonitoring() {
    136     // We cannot take back the delayed task we posted in
    137     // StartMonitoring(), so we just mark the monitoring is inactive now.
    138     monitoring_is_active_ = false;
    139   }
    140 
    141   // Returns the interval.
    142   base::TimeDelta GetInterval() {
    143     return base::TimeDelta::FromMilliseconds(
    144         dbus_timeout_get_interval(raw_timeout_));
    145   }
    146 
    147   // Cleans up the raw_timeout and marks that timeout is completed.
    148   // See the class comment above for why we are doing this.
    149   void Complete() {
    150     dbus_timeout_set_data(raw_timeout_, NULL, NULL);
    151     is_completed = true;
    152     Release();
    153   }
    154 
    155  private:
    156   friend class base::RefCountedThreadSafe<Timeout>;
    157   ~Timeout() {
    158   }
    159 
    160   // Handles the timeout.
    161   void HandleTimeout() {
    162     // If the timeout is marked completed, we should do nothing. This can
    163     // occur if this function is called after Bus::OnRemoveTimeout().
    164     if (is_completed)
    165       return;
    166     // Skip if monitoring is canceled.
    167     if (!monitoring_is_active_)
    168       return;
    169 
    170     const bool success = dbus_timeout_handle(raw_timeout_);
    171     CHECK(success) << "Unable to allocate memory";
    172   }
    173 
    174   DBusTimeout* raw_timeout_;
    175   bool monitoring_is_active_;
    176   bool is_completed;
    177 };
    178 
    179 }  // namespace
    180 
    181 Bus::Options::Options()
    182   : bus_type(SESSION),
    183     connection_type(PRIVATE) {
    184 }
    185 
    186 Bus::Options::~Options() {
    187 }
    188 
    189 Bus::Bus(const Options& options)
    190     : bus_type_(options.bus_type),
    191       connection_type_(options.connection_type),
    192       dbus_task_runner_(options.dbus_task_runner),
    193       on_shutdown_(false /* manual_reset */, false /* initially_signaled */),
    194       connection_(NULL),
    195       origin_thread_id_(base::PlatformThread::CurrentId()),
    196       async_operations_set_up_(false),
    197       shutdown_completed_(false),
    198       num_pending_watches_(0),
    199       num_pending_timeouts_(0),
    200       address_(options.address),
    201       on_disconnected_closure_(options.disconnected_callback) {
    202   // This is safe to call multiple times.
    203   dbus_threads_init_default();
    204   // The origin message loop is unnecessary if the client uses synchronous
    205   // functions only.
    206   if (base::MessageLoop::current())
    207     origin_task_runner_ = base::MessageLoop::current()->message_loop_proxy();
    208 }
    209 
    210 Bus::~Bus() {
    211   DCHECK(!connection_);
    212   DCHECK(owned_service_names_.empty());
    213   DCHECK(match_rules_added_.empty());
    214   DCHECK(filter_functions_added_.empty());
    215   DCHECK(registered_object_paths_.empty());
    216   DCHECK_EQ(0, num_pending_watches_);
    217   // TODO(satorux): This check fails occasionally in browser_tests for tests
    218   // that run very quickly. Perhaps something does not have time to clean up.
    219   // Despite the check failing, the tests seem to run fine. crosbug.com/23416
    220   // DCHECK_EQ(0, num_pending_timeouts_);
    221 }
    222 
    223 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
    224                                  const ObjectPath& object_path) {
    225   return GetObjectProxyWithOptions(service_name, object_path,
    226                                    ObjectProxy::DEFAULT_OPTIONS);
    227 }
    228 
    229 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
    230                                             const ObjectPath& object_path,
    231                                             int options) {
    232   AssertOnOriginThread();
    233 
    234   // Check if we already have the requested object proxy.
    235   const ObjectProxyTable::key_type key(service_name + object_path.value(),
    236                                        options);
    237   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
    238   if (iter != object_proxy_table_.end()) {
    239     return iter->second.get();
    240   }
    241 
    242   scoped_refptr<ObjectProxy> object_proxy =
    243       new ObjectProxy(this, service_name, object_path, options);
    244   object_proxy_table_[key] = object_proxy;
    245 
    246   return object_proxy.get();
    247 }
    248 
    249 bool Bus::RemoveObjectProxy(const std::string& service_name,
    250                             const ObjectPath& object_path,
    251                             const base::Closure& callback) {
    252   return RemoveObjectProxyWithOptions(service_name, object_path,
    253                                       ObjectProxy::DEFAULT_OPTIONS,
    254                                       callback);
    255 }
    256 
    257 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
    258                                        const ObjectPath& object_path,
    259                                        int options,
    260                                        const base::Closure& callback) {
    261   AssertOnOriginThread();
    262 
    263   // Check if we have the requested object proxy.
    264   const ObjectProxyTable::key_type key(service_name + object_path.value(),
    265                                        options);
    266   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
    267   if (iter != object_proxy_table_.end()) {
    268     scoped_refptr<ObjectProxy> object_proxy = iter->second;
    269     object_proxy_table_.erase(iter);
    270     // Object is present. Remove it now and Detach on the DBus thread.
    271     GetDBusTaskRunner()->PostTask(
    272         FROM_HERE,
    273         base::Bind(&Bus::RemoveObjectProxyInternal,
    274                    this, object_proxy, callback));
    275     return true;
    276   }
    277   return false;
    278 }
    279 
    280 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy,
    281                                     const base::Closure& callback) {
    282   AssertOnDBusThread();
    283 
    284   object_proxy.get()->Detach();
    285 
    286   GetOriginTaskRunner()->PostTask(FROM_HERE, callback);
    287 }
    288 
    289 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
    290   AssertOnOriginThread();
    291 
    292   // Check if we already have the requested exported object.
    293   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
    294   if (iter != exported_object_table_.end()) {
    295     return iter->second.get();
    296   }
    297 
    298   scoped_refptr<ExportedObject> exported_object =
    299       new ExportedObject(this, object_path);
    300   exported_object_table_[object_path] = exported_object;
    301 
    302   return exported_object.get();
    303 }
    304 
    305 void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
    306   AssertOnOriginThread();
    307 
    308   // Remove the registered object from the table first, to allow a new
    309   // GetExportedObject() call to return a new object, rather than this one.
    310   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
    311   if (iter == exported_object_table_.end())
    312     return;
    313 
    314   scoped_refptr<ExportedObject> exported_object = iter->second;
    315   exported_object_table_.erase(iter);
    316 
    317   // Post the task to perform the final unregistration to the D-Bus thread.
    318   // Since the registration also happens on the D-Bus thread in
    319   // TryRegisterObjectPath(), and the task runner we post to is a
    320   // SequencedTaskRunner, there is a guarantee that this will happen before any
    321   // future registration call.
    322   GetDBusTaskRunner()->PostTask(
    323       FROM_HERE,
    324       base::Bind(&Bus::UnregisterExportedObjectInternal,
    325                  this, exported_object));
    326 }
    327 
    328 void Bus::UnregisterExportedObjectInternal(
    329     scoped_refptr<ExportedObject> exported_object) {
    330   AssertOnDBusThread();
    331 
    332   exported_object->Unregister();
    333 }
    334 
    335 ObjectManager* Bus::GetObjectManager(const std::string& service_name,
    336                                      const ObjectPath& object_path) {
    337   AssertOnOriginThread();
    338 
    339   // Check if we already have the requested object manager.
    340   const ObjectManagerTable::key_type key(service_name + object_path.value());
    341   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
    342   if (iter != object_manager_table_.end()) {
    343     return iter->second.get();
    344   }
    345 
    346   scoped_refptr<ObjectManager> object_manager =
    347       new ObjectManager(this, service_name, object_path);
    348   object_manager_table_[key] = object_manager;
    349 
    350   return object_manager.get();
    351 }
    352 
    353 bool Bus::RemoveObjectManager(const std::string& service_name,
    354                               const ObjectPath& object_path,
    355                               const base::Closure& callback) {
    356   AssertOnOriginThread();
    357   DCHECK(!callback.is_null());
    358 
    359   const ObjectManagerTable::key_type key(service_name + object_path.value());
    360   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
    361   if (iter == object_manager_table_.end())
    362     return false;
    363 
    364   // ObjectManager is present. Remove it now and CleanUp on the DBus thread.
    365   scoped_refptr<ObjectManager> object_manager = iter->second;
    366   object_manager_table_.erase(iter);
    367 
    368   GetDBusTaskRunner()->PostTask(
    369       FROM_HERE,
    370       base::Bind(&Bus::RemoveObjectManagerInternal,
    371                  this, object_manager, callback));
    372 
    373   return true;
    374 }
    375 
    376 void Bus::RemoveObjectManagerInternal(
    377       scoped_refptr<dbus::ObjectManager> object_manager,
    378       const base::Closure& callback) {
    379   AssertOnDBusThread();
    380   DCHECK(object_manager.get());
    381 
    382   object_manager->CleanUp();
    383 
    384   // The ObjectManager has to be deleted on the origin thread since it was
    385   // created there.
    386   GetOriginTaskRunner()->PostTask(
    387       FROM_HERE,
    388       base::Bind(&Bus::RemoveObjectManagerInternalHelper,
    389                  this, object_manager, callback));
    390 }
    391 
    392 void Bus::RemoveObjectManagerInternalHelper(
    393       scoped_refptr<dbus::ObjectManager> object_manager,
    394       const base::Closure& callback) {
    395   AssertOnOriginThread();
    396   DCHECK(object_manager.get());
    397 
    398   // Release the object manager and run the callback.
    399   object_manager = NULL;
    400   callback.Run();
    401 }
    402 
    403 void Bus::GetManagedObjects() {
    404   for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
    405        iter != object_manager_table_.end(); ++iter) {
    406     iter->second->GetManagedObjects();
    407   }
    408 }
    409 
    410 bool Bus::Connect() {
    411   // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
    412   AssertOnDBusThread();
    413 
    414   // Check if it's already initialized.
    415   if (connection_)
    416     return true;
    417 
    418   ScopedDBusError error;
    419   if (bus_type_ == CUSTOM_ADDRESS) {
    420     if (connection_type_ == PRIVATE) {
    421       connection_ = dbus_connection_open_private(address_.c_str(), error.get());
    422     } else {
    423       connection_ = dbus_connection_open(address_.c_str(), error.get());
    424     }
    425   } else {
    426     const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
    427     if (connection_type_ == PRIVATE) {
    428       connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
    429     } else {
    430       connection_ = dbus_bus_get(dbus_bus_type, error.get());
    431     }
    432   }
    433   if (!connection_) {
    434     LOG(ERROR) << "Failed to connect to the bus: "
    435                << (error.is_set() ? error.message() : "");
    436     return false;
    437   }
    438 
    439   if (bus_type_ == CUSTOM_ADDRESS) {
    440     // We should call dbus_bus_register here, otherwise unique name can not be
    441     // acquired. According to dbus specification, it is responsible to call
    442     // org.freedesktop.DBus.Hello method at the beging of bus connection to
    443     // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
    444     // called internally.
    445     if (!dbus_bus_register(connection_, error.get())) {
    446       LOG(ERROR) << "Failed to register the bus component: "
    447                  << (error.is_set() ? error.message() : "");
    448       return false;
    449     }
    450   }
    451   // We shouldn't exit on the disconnected signal.
    452   dbus_connection_set_exit_on_disconnect(connection_, false);
    453 
    454   // Watch Disconnected signal.
    455   AddFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
    456   AddMatch(kDisconnectedMatchRule, error.get());
    457 
    458   return true;
    459 }
    460 
    461 void Bus::ClosePrivateConnection() {
    462   // dbus_connection_close is blocking call.
    463   AssertOnDBusThread();
    464   DCHECK_EQ(PRIVATE, connection_type_)
    465       << "non-private connection should not be closed";
    466   dbus_connection_close(connection_);
    467 }
    468 
    469 void Bus::ShutdownAndBlock() {
    470   AssertOnDBusThread();
    471 
    472   if (shutdown_completed_)
    473     return;  // Already shutdowned, just return.
    474 
    475   // Unregister the exported objects.
    476   for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
    477        iter != exported_object_table_.end(); ++iter) {
    478     iter->second->Unregister();
    479   }
    480 
    481   // Release all service names.
    482   for (std::set<std::string>::iterator iter = owned_service_names_.begin();
    483        iter != owned_service_names_.end();) {
    484     // This is a bit tricky but we should increment the iter here as
    485     // ReleaseOwnership() may remove |service_name| from the set.
    486     const std::string& service_name = *iter++;
    487     ReleaseOwnership(service_name);
    488   }
    489   if (!owned_service_names_.empty()) {
    490     LOG(ERROR) << "Failed to release all service names. # of services left: "
    491                << owned_service_names_.size();
    492   }
    493 
    494   // Detach from the remote objects.
    495   for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
    496        iter != object_proxy_table_.end(); ++iter) {
    497     iter->second->Detach();
    498   }
    499 
    500   // Clean up the object managers.
    501   for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
    502        iter != object_manager_table_.end(); ++iter) {
    503     iter->second->CleanUp();
    504   }
    505 
    506   // Release object proxies and exported objects here. We should do this
    507   // here rather than in the destructor to avoid memory leaks due to
    508   // cyclic references.
    509   object_proxy_table_.clear();
    510   exported_object_table_.clear();
    511 
    512   // Private connection should be closed.
    513   if (connection_) {
    514     // Remove Disconnected watcher.
    515     ScopedDBusError error;
    516     RemoveFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
    517     RemoveMatch(kDisconnectedMatchRule, error.get());
    518 
    519     if (connection_type_ == PRIVATE)
    520       ClosePrivateConnection();
    521     // dbus_connection_close() won't unref.
    522     dbus_connection_unref(connection_);
    523   }
    524 
    525   connection_ = NULL;
    526   shutdown_completed_ = true;
    527 }
    528 
    529 void Bus::ShutdownOnDBusThreadAndBlock() {
    530   AssertOnOriginThread();
    531   DCHECK(dbus_task_runner_.get());
    532 
    533   GetDBusTaskRunner()->PostTask(
    534       FROM_HERE,
    535       base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this));
    536 
    537   // http://crbug.com/125222
    538   base::ThreadRestrictions::ScopedAllowWait allow_wait;
    539 
    540   // Wait until the shutdown is complete on the D-Bus thread.
    541   // The shutdown should not hang, but set timeout just in case.
    542   const int kTimeoutSecs = 3;
    543   const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
    544   const bool signaled = on_shutdown_.TimedWait(timeout);
    545   LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
    546 }
    547 
    548 void Bus::RequestOwnership(const std::string& service_name,
    549                            ServiceOwnershipOptions options,
    550                            OnOwnershipCallback on_ownership_callback) {
    551   AssertOnOriginThread();
    552 
    553   GetDBusTaskRunner()->PostTask(
    554       FROM_HERE,
    555       base::Bind(&Bus::RequestOwnershipInternal,
    556                  this, service_name, options, on_ownership_callback));
    557 }
    558 
    559 void Bus::RequestOwnershipInternal(const std::string& service_name,
    560                                    ServiceOwnershipOptions options,
    561                                    OnOwnershipCallback on_ownership_callback) {
    562   AssertOnDBusThread();
    563 
    564   bool success = Connect();
    565   if (success)
    566     success = RequestOwnershipAndBlock(service_name, options);
    567 
    568   GetOriginTaskRunner()->PostTask(FROM_HERE,
    569                                   base::Bind(on_ownership_callback,
    570                                              service_name,
    571                                              success));
    572 }
    573 
    574 bool Bus::RequestOwnershipAndBlock(const std::string& service_name,
    575                                    ServiceOwnershipOptions options) {
    576   DCHECK(connection_);
    577   // dbus_bus_request_name() is a blocking call.
    578   AssertOnDBusThread();
    579 
    580   // Check if we already own the service name.
    581   if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
    582     return true;
    583   }
    584 
    585   ScopedDBusError error;
    586   const int result = dbus_bus_request_name(connection_,
    587                                            service_name.c_str(),
    588                                            options,
    589                                            error.get());
    590   if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
    591     LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
    592                << (error.is_set() ? error.message() : "");
    593     return false;
    594   }
    595   owned_service_names_.insert(service_name);
    596   return true;
    597 }
    598 
    599 bool Bus::ReleaseOwnership(const std::string& service_name) {
    600   DCHECK(connection_);
    601   // dbus_bus_request_name() is a blocking call.
    602   AssertOnDBusThread();
    603 
    604   // Check if we already own the service name.
    605   std::set<std::string>::iterator found =
    606       owned_service_names_.find(service_name);
    607   if (found == owned_service_names_.end()) {
    608     LOG(ERROR) << service_name << " is not owned by the bus";
    609     return false;
    610   }
    611 
    612   ScopedDBusError error;
    613   const int result = dbus_bus_release_name(connection_, service_name.c_str(),
    614                                            error.get());
    615   if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
    616     owned_service_names_.erase(found);
    617     return true;
    618   } else {
    619     LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
    620                << (error.is_set() ? error.message() : "")
    621                << ", result code: " << result;
    622     return false;
    623   }
    624 }
    625 
    626 bool Bus::SetUpAsyncOperations() {
    627   DCHECK(connection_);
    628   AssertOnDBusThread();
    629 
    630   if (async_operations_set_up_)
    631     return true;
    632 
    633   // Process all the incoming data if any, so that OnDispatchStatus() will
    634   // be called when the incoming data is ready.
    635   ProcessAllIncomingDataIfAny();
    636 
    637   bool success = dbus_connection_set_watch_functions(connection_,
    638                                                      &Bus::OnAddWatchThunk,
    639                                                      &Bus::OnRemoveWatchThunk,
    640                                                      &Bus::OnToggleWatchThunk,
    641                                                      this,
    642                                                      NULL);
    643   CHECK(success) << "Unable to allocate memory";
    644 
    645   success = dbus_connection_set_timeout_functions(connection_,
    646                                                   &Bus::OnAddTimeoutThunk,
    647                                                   &Bus::OnRemoveTimeoutThunk,
    648                                                   &Bus::OnToggleTimeoutThunk,
    649                                                   this,
    650                                                   NULL);
    651   CHECK(success) << "Unable to allocate memory";
    652 
    653   dbus_connection_set_dispatch_status_function(
    654       connection_,
    655       &Bus::OnDispatchStatusChangedThunk,
    656       this,
    657       NULL);
    658 
    659   async_operations_set_up_ = true;
    660 
    661   return true;
    662 }
    663 
    664 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
    665                                         int timeout_ms,
    666                                         DBusError* error) {
    667   DCHECK(connection_);
    668   AssertOnDBusThread();
    669 
    670   return dbus_connection_send_with_reply_and_block(
    671       connection_, request, timeout_ms, error);
    672 }
    673 
    674 void Bus::SendWithReply(DBusMessage* request,
    675                         DBusPendingCall** pending_call,
    676                         int timeout_ms) {
    677   DCHECK(connection_);
    678   AssertOnDBusThread();
    679 
    680   const bool success = dbus_connection_send_with_reply(
    681       connection_, request, pending_call, timeout_ms);
    682   CHECK(success) << "Unable to allocate memory";
    683 }
    684 
    685 void Bus::Send(DBusMessage* request, uint32* serial) {
    686   DCHECK(connection_);
    687   AssertOnDBusThread();
    688 
    689   const bool success = dbus_connection_send(connection_, request, serial);
    690   CHECK(success) << "Unable to allocate memory";
    691 }
    692 
    693 bool Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
    694                             void* user_data) {
    695   DCHECK(connection_);
    696   AssertOnDBusThread();
    697 
    698   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
    699       std::make_pair(filter_function, user_data);
    700   if (filter_functions_added_.find(filter_data_pair) !=
    701       filter_functions_added_.end()) {
    702     VLOG(1) << "Filter function already exists: " << filter_function
    703             << " with associated data: " << user_data;
    704     return false;
    705   }
    706 
    707   const bool success = dbus_connection_add_filter(
    708       connection_, filter_function, user_data, NULL);
    709   CHECK(success) << "Unable to allocate memory";
    710   filter_functions_added_.insert(filter_data_pair);
    711   return true;
    712 }
    713 
    714 bool Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
    715                                void* user_data) {
    716   DCHECK(connection_);
    717   AssertOnDBusThread();
    718 
    719   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
    720       std::make_pair(filter_function, user_data);
    721   if (filter_functions_added_.find(filter_data_pair) ==
    722       filter_functions_added_.end()) {
    723     VLOG(1) << "Requested to remove an unknown filter function: "
    724             << filter_function
    725             << " with associated data: " << user_data;
    726     return false;
    727   }
    728 
    729   dbus_connection_remove_filter(connection_, filter_function, user_data);
    730   filter_functions_added_.erase(filter_data_pair);
    731   return true;
    732 }
    733 
    734 void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
    735   DCHECK(connection_);
    736   AssertOnDBusThread();
    737 
    738   std::map<std::string, int>::iterator iter =
    739       match_rules_added_.find(match_rule);
    740   if (iter != match_rules_added_.end()) {
    741     // The already existing rule's counter is incremented.
    742     iter->second++;
    743 
    744     VLOG(1) << "Match rule already exists: " << match_rule;
    745     return;
    746   }
    747 
    748   dbus_bus_add_match(connection_, match_rule.c_str(), error);
    749   match_rules_added_[match_rule] = 1;
    750 }
    751 
    752 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
    753   DCHECK(connection_);
    754   AssertOnDBusThread();
    755 
    756   std::map<std::string, int>::iterator iter =
    757       match_rules_added_.find(match_rule);
    758   if (iter == match_rules_added_.end()) {
    759     LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
    760     return false;
    761   }
    762 
    763   // The rule's counter is decremented and the rule is deleted when reachs 0.
    764   iter->second--;
    765   if (iter->second == 0) {
    766     dbus_bus_remove_match(connection_, match_rule.c_str(), error);
    767     match_rules_added_.erase(match_rule);
    768   }
    769   return true;
    770 }
    771 
    772 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
    773                                 const DBusObjectPathVTable* vtable,
    774                                 void* user_data,
    775                                 DBusError* error) {
    776   DCHECK(connection_);
    777   AssertOnDBusThread();
    778 
    779   if (registered_object_paths_.find(object_path) !=
    780       registered_object_paths_.end()) {
    781     LOG(ERROR) << "Object path already registered: " << object_path.value();
    782     return false;
    783   }
    784 
    785   const bool success = dbus_connection_try_register_object_path(
    786       connection_,
    787       object_path.value().c_str(),
    788       vtable,
    789       user_data,
    790       error);
    791   if (success)
    792     registered_object_paths_.insert(object_path);
    793   return success;
    794 }
    795 
    796 void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
    797   DCHECK(connection_);
    798   AssertOnDBusThread();
    799 
    800   if (registered_object_paths_.find(object_path) ==
    801       registered_object_paths_.end()) {
    802     LOG(ERROR) << "Requested to unregister an unknown object path: "
    803                << object_path.value();
    804     return;
    805   }
    806 
    807   const bool success = dbus_connection_unregister_object_path(
    808       connection_,
    809       object_path.value().c_str());
    810   CHECK(success) << "Unable to allocate memory";
    811   registered_object_paths_.erase(object_path);
    812 }
    813 
    814 void Bus::ShutdownOnDBusThreadAndBlockInternal() {
    815   AssertOnDBusThread();
    816 
    817   ShutdownAndBlock();
    818   on_shutdown_.Signal();
    819 }
    820 
    821 void Bus::ProcessAllIncomingDataIfAny() {
    822   AssertOnDBusThread();
    823 
    824   // As mentioned at the class comment in .h file, connection_ can be NULL.
    825   if (!connection_)
    826     return;
    827 
    828   // It is safe and necessary to call dbus_connection_get_dispatch_status even
    829   // if the connection is lost. Otherwise we will miss "Disconnected" signal.
    830   // (crbug.com/174431)
    831   if (dbus_connection_get_dispatch_status(connection_) ==
    832       DBUS_DISPATCH_DATA_REMAINS) {
    833     while (dbus_connection_dispatch(connection_) ==
    834            DBUS_DISPATCH_DATA_REMAINS) {
    835     }
    836   }
    837 }
    838 
    839 base::TaskRunner* Bus::GetDBusTaskRunner() {
    840   if (dbus_task_runner_.get())
    841     return dbus_task_runner_.get();
    842   else
    843     return GetOriginTaskRunner();
    844 }
    845 
    846 base::TaskRunner* Bus::GetOriginTaskRunner() {
    847   DCHECK(origin_task_runner_.get());
    848   return origin_task_runner_.get();
    849 }
    850 
    851 bool Bus::HasDBusThread() {
    852   return dbus_task_runner_.get() != NULL;
    853 }
    854 
    855 void Bus::AssertOnOriginThread() {
    856   DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
    857 }
    858 
    859 void Bus::AssertOnDBusThread() {
    860   base::ThreadRestrictions::AssertIOAllowed();
    861 
    862   if (dbus_task_runner_.get()) {
    863     DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread());
    864   } else {
    865     AssertOnOriginThread();
    866   }
    867 }
    868 
    869 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name,
    870                                          GetServiceOwnerOption options) {
    871   AssertOnDBusThread();
    872 
    873   MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner");
    874   MessageWriter writer(&get_name_owner_call);
    875   writer.AppendString(service_name);
    876   VLOG(1) << "Method call: " << get_name_owner_call.ToString();
    877 
    878   const ObjectPath obj_path("/org/freedesktop/DBus");
    879   if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") ||
    880       !get_name_owner_call.SetPath(obj_path)) {
    881     if (options == REPORT_ERRORS)
    882       LOG(ERROR) << "Failed to get name owner.";
    883     return "";
    884   }
    885 
    886   ScopedDBusError error;
    887   DBusMessage* response_message =
    888       SendWithReplyAndBlock(get_name_owner_call.raw_message(),
    889                             ObjectProxy::TIMEOUT_USE_DEFAULT,
    890                             error.get());
    891   if (!response_message) {
    892     if (options == REPORT_ERRORS) {
    893       LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": "
    894                  << error.message();
    895     }
    896     return "";
    897   }
    898 
    899   scoped_ptr<Response> response(Response::FromRawMessage(response_message));
    900   MessageReader reader(response.get());
    901 
    902   std::string service_owner;
    903   if (!reader.PopString(&service_owner))
    904     service_owner.clear();
    905   return service_owner;
    906 }
    907 
    908 void Bus::GetServiceOwner(const std::string& service_name,
    909                           const GetServiceOwnerCallback& callback) {
    910   AssertOnOriginThread();
    911 
    912   GetDBusTaskRunner()->PostTask(
    913       FROM_HERE,
    914       base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback));
    915 }
    916 
    917 void Bus::GetServiceOwnerInternal(const std::string& service_name,
    918                                   const GetServiceOwnerCallback& callback) {
    919   AssertOnDBusThread();
    920 
    921   std::string service_owner;
    922   if (Connect())
    923     service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS);
    924   GetOriginTaskRunner()->PostTask(FROM_HERE,
    925                                   base::Bind(callback, service_owner));
    926 }
    927 
    928 void Bus::ListenForServiceOwnerChange(
    929     const std::string& service_name,
    930     const GetServiceOwnerCallback& callback) {
    931   AssertOnOriginThread();
    932   DCHECK(!service_name.empty());
    933   DCHECK(!callback.is_null());
    934 
    935   GetDBusTaskRunner()->PostTask(
    936       FROM_HERE,
    937       base::Bind(&Bus::ListenForServiceOwnerChangeInternal,
    938                  this, service_name, callback));
    939 }
    940 
    941 void Bus::ListenForServiceOwnerChangeInternal(
    942     const std::string& service_name,
    943     const GetServiceOwnerCallback& callback) {
    944   AssertOnDBusThread();
    945   DCHECK(!service_name.empty());
    946   DCHECK(!callback.is_null());
    947 
    948   if (!Connect() || !SetUpAsyncOperations())
    949     return;
    950 
    951   if (service_owner_changed_listener_map_.empty()) {
    952     bool filter_added =
    953         AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
    954     DCHECK(filter_added);
    955   }
    956 
    957   ServiceOwnerChangedListenerMap::iterator it =
    958       service_owner_changed_listener_map_.find(service_name);
    959   if (it == service_owner_changed_listener_map_.end()) {
    960     // Add a match rule for the new service name.
    961     const std::string name_owner_changed_match_rule =
    962         base::StringPrintf(kServiceNameOwnerChangeMatchRule,
    963                            service_name.c_str());
    964     ScopedDBusError error;
    965     AddMatch(name_owner_changed_match_rule, error.get());
    966     if (error.is_set()) {
    967       LOG(ERROR) << "Failed to add match rule for " << service_name
    968                  << ". Got " << error.name() << ": " << error.message();
    969       return;
    970     }
    971 
    972     service_owner_changed_listener_map_[service_name].push_back(callback);
    973     return;
    974   }
    975 
    976   // Check if the callback has already been added.
    977   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
    978   for (size_t i = 0; i < callbacks.size(); ++i) {
    979     if (callbacks[i].Equals(callback))
    980       return;
    981   }
    982   callbacks.push_back(callback);
    983 }
    984 
    985 void Bus::UnlistenForServiceOwnerChange(
    986     const std::string& service_name,
    987     const GetServiceOwnerCallback& callback) {
    988   AssertOnOriginThread();
    989   DCHECK(!service_name.empty());
    990   DCHECK(!callback.is_null());
    991 
    992   GetDBusTaskRunner()->PostTask(
    993       FROM_HERE,
    994       base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal,
    995                  this, service_name, callback));
    996 }
    997 
    998 void Bus::UnlistenForServiceOwnerChangeInternal(
    999     const std::string& service_name,
   1000     const GetServiceOwnerCallback& callback) {
   1001   AssertOnDBusThread();
   1002   DCHECK(!service_name.empty());
   1003   DCHECK(!callback.is_null());
   1004 
   1005   ServiceOwnerChangedListenerMap::iterator it =
   1006       service_owner_changed_listener_map_.find(service_name);
   1007   if (it == service_owner_changed_listener_map_.end())
   1008     return;
   1009 
   1010   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
   1011   for (size_t i = 0; i < callbacks.size(); ++i) {
   1012     if (callbacks[i].Equals(callback)) {
   1013       callbacks.erase(callbacks.begin() + i);
   1014       break;  // There can be only one.
   1015     }
   1016   }
   1017   if (!callbacks.empty())
   1018     return;
   1019 
   1020   // Last callback for |service_name| has been removed, remove match rule.
   1021   const std::string name_owner_changed_match_rule =
   1022       base::StringPrintf(kServiceNameOwnerChangeMatchRule,
   1023                          service_name.c_str());
   1024   ScopedDBusError error;
   1025   RemoveMatch(name_owner_changed_match_rule, error.get());
   1026   // And remove |service_owner_changed_listener_map_| entry.
   1027   service_owner_changed_listener_map_.erase(it);
   1028 
   1029   if (service_owner_changed_listener_map_.empty()) {
   1030     bool filter_removed =
   1031         RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
   1032     DCHECK(filter_removed);
   1033   }
   1034 }
   1035 
   1036 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
   1037   AssertOnDBusThread();
   1038 
   1039   // watch will be deleted when raw_watch is removed in OnRemoveWatch().
   1040   Watch* watch = new Watch(raw_watch);
   1041   if (watch->IsReadyToBeWatched()) {
   1042     watch->StartWatching();
   1043   }
   1044   ++num_pending_watches_;
   1045   return true;
   1046 }
   1047 
   1048 void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
   1049   AssertOnDBusThread();
   1050 
   1051   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
   1052   delete watch;
   1053   --num_pending_watches_;
   1054 }
   1055 
   1056 void Bus::OnToggleWatch(DBusWatch* raw_watch) {
   1057   AssertOnDBusThread();
   1058 
   1059   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
   1060   if (watch->IsReadyToBeWatched()) {
   1061     watch->StartWatching();
   1062   } else {
   1063     // It's safe to call this if StartWatching() wasn't called, per
   1064     // message_pump_libevent.h.
   1065     watch->StopWatching();
   1066   }
   1067 }
   1068 
   1069 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
   1070   AssertOnDBusThread();
   1071 
   1072   // timeout will be deleted when raw_timeout is removed in
   1073   // OnRemoveTimeoutThunk().
   1074   Timeout* timeout = new Timeout(raw_timeout);
   1075   if (timeout->IsReadyToBeMonitored()) {
   1076     timeout->StartMonitoring(this);
   1077   }
   1078   ++num_pending_timeouts_;
   1079   return true;
   1080 }
   1081 
   1082 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
   1083   AssertOnDBusThread();
   1084 
   1085   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
   1086   timeout->Complete();
   1087   --num_pending_timeouts_;
   1088 }
   1089 
   1090 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
   1091   AssertOnDBusThread();
   1092 
   1093   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
   1094   if (timeout->IsReadyToBeMonitored()) {
   1095     timeout->StartMonitoring(this);
   1096   } else {
   1097     timeout->StopMonitoring();
   1098   }
   1099 }
   1100 
   1101 void Bus::OnDispatchStatusChanged(DBusConnection* connection,
   1102                                   DBusDispatchStatus status) {
   1103   DCHECK_EQ(connection, connection_);
   1104   AssertOnDBusThread();
   1105 
   1106   // We cannot call ProcessAllIncomingDataIfAny() here, as calling
   1107   // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
   1108   // prohibited by the D-Bus library. Hence, we post a task here instead.
   1109   // See comments for dbus_connection_set_dispatch_status_function().
   1110   GetDBusTaskRunner()->PostTask(FROM_HERE,
   1111                                 base::Bind(&Bus::ProcessAllIncomingDataIfAny,
   1112                                            this));
   1113 }
   1114 
   1115 void Bus::OnConnectionDisconnected(DBusConnection* connection) {
   1116   AssertOnDBusThread();
   1117 
   1118   if (!on_disconnected_closure_.is_null())
   1119     GetOriginTaskRunner()->PostTask(FROM_HERE, on_disconnected_closure_);
   1120 
   1121   if (!connection)
   1122     return;
   1123   DCHECK(!dbus_connection_get_is_connected(connection));
   1124 
   1125   ShutdownAndBlock();
   1126 }
   1127 
   1128 void Bus::OnServiceOwnerChanged(DBusMessage* message) {
   1129   DCHECK(message);
   1130   AssertOnDBusThread();
   1131 
   1132   // |message| will be unrefed on exit of the function. Increment the
   1133   // reference so we can use it in Signal::FromRawMessage() below.
   1134   dbus_message_ref(message);
   1135   scoped_ptr<Signal> signal(Signal::FromRawMessage(message));
   1136 
   1137   // Confirm the validity of the NameOwnerChanged signal.
   1138   if (signal->GetMember() != kNameOwnerChangedSignal ||
   1139       signal->GetInterface() != DBUS_INTERFACE_DBUS ||
   1140       signal->GetSender() != DBUS_SERVICE_DBUS) {
   1141     return;
   1142   }
   1143 
   1144   MessageReader reader(signal.get());
   1145   std::string service_name;
   1146   std::string old_owner;
   1147   std::string new_owner;
   1148   if (!reader.PopString(&service_name) ||
   1149       !reader.PopString(&old_owner) ||
   1150       !reader.PopString(&new_owner)) {
   1151     return;
   1152   }
   1153 
   1154   ServiceOwnerChangedListenerMap::const_iterator it =
   1155       service_owner_changed_listener_map_.find(service_name);
   1156   if (it == service_owner_changed_listener_map_.end())
   1157     return;
   1158 
   1159   const std::vector<GetServiceOwnerCallback>& callbacks = it->second;
   1160   for (size_t i = 0; i < callbacks.size(); ++i) {
   1161     GetOriginTaskRunner()->PostTask(FROM_HERE,
   1162                                     base::Bind(callbacks[i], new_owner));
   1163   }
   1164 }
   1165 
   1166 // static
   1167 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
   1168   Bus* self = static_cast<Bus*>(data);
   1169   return self->OnAddWatch(raw_watch);
   1170 }
   1171 
   1172 // static
   1173 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
   1174   Bus* self = static_cast<Bus*>(data);
   1175   self->OnRemoveWatch(raw_watch);
   1176 }
   1177 
   1178 // static
   1179 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
   1180   Bus* self = static_cast<Bus*>(data);
   1181   self->OnToggleWatch(raw_watch);
   1182 }
   1183 
   1184 // static
   1185 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1186   Bus* self = static_cast<Bus*>(data);
   1187   return self->OnAddTimeout(raw_timeout);
   1188 }
   1189 
   1190 // static
   1191 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1192   Bus* self = static_cast<Bus*>(data);
   1193   self->OnRemoveTimeout(raw_timeout);
   1194 }
   1195 
   1196 // static
   1197 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1198   Bus* self = static_cast<Bus*>(data);
   1199   self->OnToggleTimeout(raw_timeout);
   1200 }
   1201 
   1202 // static
   1203 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
   1204                                        DBusDispatchStatus status,
   1205                                        void* data) {
   1206   Bus* self = static_cast<Bus*>(data);
   1207   self->OnDispatchStatusChanged(connection, status);
   1208 }
   1209 
   1210 // static
   1211 DBusHandlerResult Bus::OnConnectionDisconnectedFilter(
   1212     DBusConnection* connection,
   1213     DBusMessage* message,
   1214     void* data) {
   1215   if (dbus_message_is_signal(message,
   1216                              DBUS_INTERFACE_LOCAL,
   1217                              kDisconnectedSignal)) {
   1218     Bus* self = static_cast<Bus*>(data);
   1219     self->OnConnectionDisconnected(connection);
   1220     return DBUS_HANDLER_RESULT_HANDLED;
   1221   }
   1222   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
   1223 }
   1224 
   1225 // static
   1226 DBusHandlerResult Bus::OnServiceOwnerChangedFilter(
   1227     DBusConnection* connection,
   1228     DBusMessage* message,
   1229     void* data) {
   1230   if (dbus_message_is_signal(message,
   1231                              DBUS_INTERFACE_DBUS,
   1232                              kNameOwnerChangedSignal)) {
   1233     Bus* self = static_cast<Bus*>(data);
   1234     self->OnServiceOwnerChanged(message);
   1235   }
   1236   // Always return unhandled to let others, e.g. ObjectProxies, handle the same
   1237   // signal.
   1238   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
   1239 }
   1240 
   1241 }  // namespace dbus
   1242