Home | History | Annotate | Download | only in dbus
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "dbus/bus.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/logging.h"
      9 #include "base/message_loop/message_loop.h"
     10 #include "base/message_loop/message_loop_proxy.h"
     11 #include "base/stl_util.h"
     12 #include "base/strings/stringprintf.h"
     13 #include "base/threading/thread.h"
     14 #include "base/threading/thread_restrictions.h"
     15 #include "base/time/time.h"
     16 #include "dbus/exported_object.h"
     17 #include "dbus/message.h"
     18 #include "dbus/object_manager.h"
     19 #include "dbus/object_path.h"
     20 #include "dbus/object_proxy.h"
     21 #include "dbus/scoped_dbus_error.h"
     22 
     23 namespace dbus {
     24 
     25 namespace {
     26 
     27 const char kDisconnectedSignal[] = "Disconnected";
     28 const char kDisconnectedMatchRule[] =
     29     "type='signal', path='/org/freedesktop/DBus/Local',"
     30     "interface='org.freedesktop.DBus.Local', member='Disconnected'";
     31 
     32 // The NameOwnerChanged member in org.freedesktop.DBus
     33 const char kNameOwnerChangedSignal[] = "NameOwnerChanged";
     34 
     35 // The match rule used to filter for changes to a given service name owner.
     36 const char kServiceNameOwnerChangeMatchRule[] =
     37     "type='signal',interface='org.freedesktop.DBus',"
     38     "member='NameOwnerChanged',path='/org/freedesktop/DBus',"
     39     "sender='org.freedesktop.DBus',arg0='%s'";
     40 
     41 // The class is used for watching the file descriptor used for D-Bus
     42 // communication.
     43 class Watch : public base::MessagePumpLibevent::Watcher {
     44  public:
     45   explicit Watch(DBusWatch* watch)
     46       : raw_watch_(watch) {
     47     dbus_watch_set_data(raw_watch_, this, NULL);
     48   }
     49 
     50   virtual ~Watch() {
     51     dbus_watch_set_data(raw_watch_, NULL, NULL);
     52   }
     53 
     54   // Returns true if the underlying file descriptor is ready to be watched.
     55   bool IsReadyToBeWatched() {
     56     return dbus_watch_get_enabled(raw_watch_);
     57   }
     58 
     59   // Starts watching the underlying file descriptor.
     60   void StartWatching() {
     61     const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
     62     const int flags = dbus_watch_get_flags(raw_watch_);
     63 
     64     base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ;
     65     if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
     66       mode = base::MessageLoopForIO::WATCH_READ_WRITE;
     67     else if (flags & DBUS_WATCH_READABLE)
     68       mode = base::MessageLoopForIO::WATCH_READ;
     69     else if (flags & DBUS_WATCH_WRITABLE)
     70       mode = base::MessageLoopForIO::WATCH_WRITE;
     71     else
     72       NOTREACHED();
     73 
     74     const bool persistent = true;  // Watch persistently.
     75     const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor(
     76         file_descriptor, persistent, mode, &file_descriptor_watcher_, this);
     77     CHECK(success) << "Unable to allocate memory";
     78   }
     79 
     80   // Stops watching the underlying file descriptor.
     81   void StopWatching() {
     82     file_descriptor_watcher_.StopWatchingFileDescriptor();
     83   }
     84 
     85  private:
     86   // Implement MessagePumpLibevent::Watcher.
     87   virtual void OnFileCanReadWithoutBlocking(int file_descriptor) OVERRIDE {
     88     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
     89     CHECK(success) << "Unable to allocate memory";
     90   }
     91 
     92   // Implement MessagePumpLibevent::Watcher.
     93   virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) OVERRIDE {
     94     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
     95     CHECK(success) << "Unable to allocate memory";
     96   }
     97 
     98   DBusWatch* raw_watch_;
     99   base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
    100 };
    101 
    102 // The class is used for monitoring the timeout used for D-Bus method
    103 // calls.
    104 //
    105 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
    106 // the object is is alive when HandleTimeout() is called. It's unlikely
    107 // but it may be possible that HandleTimeout() is called after
    108 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
    109 // Bus::OnRemoveTimeout().
    110 class Timeout : public base::RefCountedThreadSafe<Timeout> {
    111  public:
    112   explicit Timeout(DBusTimeout* timeout)
    113       : raw_timeout_(timeout),
    114         monitoring_is_active_(false),
    115         is_completed(false) {
    116     dbus_timeout_set_data(raw_timeout_, this, NULL);
    117     AddRef();  // Balanced on Complete().
    118   }
    119 
    120   // Returns true if the timeout is ready to be monitored.
    121   bool IsReadyToBeMonitored() {
    122     return dbus_timeout_get_enabled(raw_timeout_);
    123   }
    124 
    125   // Starts monitoring the timeout.
    126   void StartMonitoring(Bus* bus) {
    127     bus->PostDelayedTaskToDBusThread(FROM_HERE,
    128                                      base::Bind(&Timeout::HandleTimeout,
    129                                                 this),
    130                                      GetInterval());
    131     monitoring_is_active_ = true;
    132   }
    133 
    134   // Stops monitoring the timeout.
    135   void StopMonitoring() {
    136     // We cannot take back the delayed task we posted in
    137     // StartMonitoring(), so we just mark the monitoring is inactive now.
    138     monitoring_is_active_ = false;
    139   }
    140 
    141   // Returns the interval.
    142   base::TimeDelta GetInterval() {
    143     return base::TimeDelta::FromMilliseconds(
    144         dbus_timeout_get_interval(raw_timeout_));
    145   }
    146 
    147   // Cleans up the raw_timeout and marks that timeout is completed.
    148   // See the class comment above for why we are doing this.
    149   void Complete() {
    150     dbus_timeout_set_data(raw_timeout_, NULL, NULL);
    151     is_completed = true;
    152     Release();
    153   }
    154 
    155  private:
    156   friend class base::RefCountedThreadSafe<Timeout>;
    157   ~Timeout() {
    158   }
    159 
    160   // Handles the timeout.
    161   void HandleTimeout() {
    162     // If the timeout is marked completed, we should do nothing. This can
    163     // occur if this function is called after Bus::OnRemoveTimeout().
    164     if (is_completed)
    165       return;
    166     // Skip if monitoring is canceled.
    167     if (!monitoring_is_active_)
    168       return;
    169 
    170     const bool success = dbus_timeout_handle(raw_timeout_);
    171     CHECK(success) << "Unable to allocate memory";
    172   }
    173 
    174   DBusTimeout* raw_timeout_;
    175   bool monitoring_is_active_;
    176   bool is_completed;
    177 };
    178 
    179 }  // namespace
    180 
    181 Bus::Options::Options()
    182   : bus_type(SESSION),
    183     connection_type(PRIVATE) {
    184 }
    185 
    186 Bus::Options::~Options() {
    187 }
    188 
    189 Bus::Bus(const Options& options)
    190     : bus_type_(options.bus_type),
    191       connection_type_(options.connection_type),
    192       dbus_task_runner_(options.dbus_task_runner),
    193       on_shutdown_(false /* manual_reset */, false /* initially_signaled */),
    194       connection_(NULL),
    195       origin_thread_id_(base::PlatformThread::CurrentId()),
    196       async_operations_set_up_(false),
    197       shutdown_completed_(false),
    198       num_pending_watches_(0),
    199       num_pending_timeouts_(0),
    200       address_(options.address),
    201       on_disconnected_closure_(options.disconnected_callback) {
    202   // This is safe to call multiple times.
    203   dbus_threads_init_default();
    204   // The origin message loop is unnecessary if the client uses synchronous
    205   // functions only.
    206   if (base::MessageLoop::current())
    207     origin_task_runner_ = base::MessageLoop::current()->message_loop_proxy();
    208 }
    209 
    210 Bus::~Bus() {
    211   DCHECK(!connection_);
    212   DCHECK(owned_service_names_.empty());
    213   DCHECK(match_rules_added_.empty());
    214   DCHECK(filter_functions_added_.empty());
    215   DCHECK(registered_object_paths_.empty());
    216   DCHECK_EQ(0, num_pending_watches_);
    217   // TODO(satorux): This check fails occasionally in browser_tests for tests
    218   // that run very quickly. Perhaps something does not have time to clean up.
    219   // Despite the check failing, the tests seem to run fine. crosbug.com/23416
    220   // DCHECK_EQ(0, num_pending_timeouts_);
    221 }
    222 
    223 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
    224                                  const ObjectPath& object_path) {
    225   return GetObjectProxyWithOptions(service_name, object_path,
    226                                    ObjectProxy::DEFAULT_OPTIONS);
    227 }
    228 
    229 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
    230                                             const ObjectPath& object_path,
    231                                             int options) {
    232   AssertOnOriginThread();
    233 
    234   // Check if we already have the requested object proxy.
    235   const ObjectProxyTable::key_type key(service_name + object_path.value(),
    236                                        options);
    237   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
    238   if (iter != object_proxy_table_.end()) {
    239     return iter->second.get();
    240   }
    241 
    242   scoped_refptr<ObjectProxy> object_proxy =
    243       new ObjectProxy(this, service_name, object_path, options);
    244   object_proxy_table_[key] = object_proxy;
    245 
    246   return object_proxy.get();
    247 }
    248 
    249 bool Bus::RemoveObjectProxy(const std::string& service_name,
    250                             const ObjectPath& object_path,
    251                             const base::Closure& callback) {
    252   return RemoveObjectProxyWithOptions(service_name, object_path,
    253                                       ObjectProxy::DEFAULT_OPTIONS,
    254                                       callback);
    255 }
    256 
    257 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
    258                                        const ObjectPath& object_path,
    259                                        int options,
    260                                        const base::Closure& callback) {
    261   AssertOnOriginThread();
    262 
    263   // Check if we have the requested object proxy.
    264   const ObjectProxyTable::key_type key(service_name + object_path.value(),
    265                                        options);
    266   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
    267   if (iter != object_proxy_table_.end()) {
    268     // Object is present. Remove it now and Detach in the DBus thread.
    269     PostTaskToDBusThread(FROM_HERE, base::Bind(
    270         &Bus::RemoveObjectProxyInternal,
    271         this, iter->second, callback));
    272 
    273     object_proxy_table_.erase(iter);
    274     return true;
    275   }
    276   return false;
    277 }
    278 
    279 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy,
    280                                     const base::Closure& callback) {
    281   AssertOnDBusThread();
    282 
    283   object_proxy.get()->Detach();
    284 
    285   PostTaskToOriginThread(FROM_HERE, callback);
    286 }
    287 
    288 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
    289   AssertOnOriginThread();
    290 
    291   // Check if we already have the requested exported object.
    292   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
    293   if (iter != exported_object_table_.end()) {
    294     return iter->second.get();
    295   }
    296 
    297   scoped_refptr<ExportedObject> exported_object =
    298       new ExportedObject(this, object_path);
    299   exported_object_table_[object_path] = exported_object;
    300 
    301   return exported_object.get();
    302 }
    303 
    304 void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
    305   AssertOnOriginThread();
    306 
    307   // Remove the registered object from the table first, to allow a new
    308   // GetExportedObject() call to return a new object, rather than this one.
    309   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
    310   if (iter == exported_object_table_.end())
    311     return;
    312 
    313   scoped_refptr<ExportedObject> exported_object = iter->second;
    314   exported_object_table_.erase(iter);
    315 
    316   // Post the task to perform the final unregistration to the D-Bus thread.
    317   // Since the registration also happens on the D-Bus thread in
    318   // TryRegisterObjectPath(), and the task runner we post to is a
    319   // SequencedTaskRunner, there is a guarantee that this will happen before any
    320   // future registration call.
    321   PostTaskToDBusThread(FROM_HERE,
    322                        base::Bind(&Bus::UnregisterExportedObjectInternal,
    323                                   this, exported_object));
    324 }
    325 
    326 void Bus::UnregisterExportedObjectInternal(
    327     scoped_refptr<ExportedObject> exported_object) {
    328   AssertOnDBusThread();
    329 
    330   exported_object->Unregister();
    331 }
    332 
    333 ObjectManager* Bus::GetObjectManager(const std::string& service_name,
    334                                      const ObjectPath& object_path) {
    335   AssertOnOriginThread();
    336 
    337   // Check if we already have the requested object manager.
    338   const ObjectManagerTable::key_type key(service_name + object_path.value());
    339   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
    340   if (iter != object_manager_table_.end()) {
    341     return iter->second.get();
    342   }
    343 
    344   scoped_refptr<ObjectManager> object_manager =
    345       new ObjectManager(this, service_name, object_path);
    346   object_manager_table_[key] = object_manager;
    347 
    348   return object_manager.get();
    349 }
    350 
    351 void Bus::RemoveObjectManager(const std::string& service_name,
    352                               const ObjectPath& object_path) {
    353   AssertOnOriginThread();
    354 
    355   const ObjectManagerTable::key_type key(service_name + object_path.value());
    356   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
    357   if (iter == object_manager_table_.end())
    358     return;
    359 
    360   scoped_refptr<ObjectManager> object_manager = iter->second;
    361   object_manager_table_.erase(iter);
    362 }
    363 
    364 void Bus::GetManagedObjects() {
    365   for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
    366        iter != object_manager_table_.end(); ++iter) {
    367     iter->second->GetManagedObjects();
    368   }
    369 }
    370 
    371 bool Bus::Connect() {
    372   // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
    373   AssertOnDBusThread();
    374 
    375   // Check if it's already initialized.
    376   if (connection_)
    377     return true;
    378 
    379   ScopedDBusError error;
    380   if (bus_type_ == CUSTOM_ADDRESS) {
    381     if (connection_type_ == PRIVATE) {
    382       connection_ = dbus_connection_open_private(address_.c_str(), error.get());
    383     } else {
    384       connection_ = dbus_connection_open(address_.c_str(), error.get());
    385     }
    386   } else {
    387     const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
    388     if (connection_type_ == PRIVATE) {
    389       connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
    390     } else {
    391       connection_ = dbus_bus_get(dbus_bus_type, error.get());
    392     }
    393   }
    394   if (!connection_) {
    395     LOG(ERROR) << "Failed to connect to the bus: "
    396                << (error.is_set() ? error.message() : "");
    397     return false;
    398   }
    399 
    400   if (bus_type_ == CUSTOM_ADDRESS) {
    401     // We should call dbus_bus_register here, otherwise unique name can not be
    402     // acquired. According to dbus specification, it is responsible to call
    403     // org.freedesktop.DBus.Hello method at the beging of bus connection to
    404     // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
    405     // called internally.
    406     if (!dbus_bus_register(connection_, error.get())) {
    407       LOG(ERROR) << "Failed to register the bus component: "
    408                  << (error.is_set() ? error.message() : "");
    409       return false;
    410     }
    411   }
    412   // We shouldn't exit on the disconnected signal.
    413   dbus_connection_set_exit_on_disconnect(connection_, false);
    414 
    415   // Watch Disconnected signal.
    416   AddFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
    417   AddMatch(kDisconnectedMatchRule, error.get());
    418 
    419   return true;
    420 }
    421 
    422 void Bus::ClosePrivateConnection() {
    423   // dbus_connection_close is blocking call.
    424   AssertOnDBusThread();
    425   DCHECK_EQ(PRIVATE, connection_type_)
    426       << "non-private connection should not be closed";
    427   dbus_connection_close(connection_);
    428 }
    429 
    430 void Bus::ShutdownAndBlock() {
    431   AssertOnDBusThread();
    432 
    433   if (shutdown_completed_)
    434     return;  // Already shutdowned, just return.
    435 
    436   // Unregister the exported objects.
    437   for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
    438        iter != exported_object_table_.end(); ++iter) {
    439     iter->second->Unregister();
    440   }
    441 
    442   // Release all service names.
    443   for (std::set<std::string>::iterator iter = owned_service_names_.begin();
    444        iter != owned_service_names_.end();) {
    445     // This is a bit tricky but we should increment the iter here as
    446     // ReleaseOwnership() may remove |service_name| from the set.
    447     const std::string& service_name = *iter++;
    448     ReleaseOwnership(service_name);
    449   }
    450   if (!owned_service_names_.empty()) {
    451     LOG(ERROR) << "Failed to release all service names. # of services left: "
    452                << owned_service_names_.size();
    453   }
    454 
    455   // Detach from the remote objects.
    456   for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
    457        iter != object_proxy_table_.end(); ++iter) {
    458     iter->second->Detach();
    459   }
    460 
    461   // Release object proxies and exported objects here. We should do this
    462   // here rather than in the destructor to avoid memory leaks due to
    463   // cyclic references.
    464   object_proxy_table_.clear();
    465   exported_object_table_.clear();
    466 
    467   // Private connection should be closed.
    468   if (connection_) {
    469     // Remove Disconnected watcher.
    470     ScopedDBusError error;
    471     RemoveFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
    472     RemoveMatch(kDisconnectedMatchRule, error.get());
    473 
    474     if (connection_type_ == PRIVATE)
    475       ClosePrivateConnection();
    476     // dbus_connection_close() won't unref.
    477     dbus_connection_unref(connection_);
    478   }
    479 
    480   connection_ = NULL;
    481   shutdown_completed_ = true;
    482 }
    483 
    484 void Bus::ShutdownOnDBusThreadAndBlock() {
    485   AssertOnOriginThread();
    486   DCHECK(dbus_task_runner_.get());
    487 
    488   PostTaskToDBusThread(FROM_HERE, base::Bind(
    489       &Bus::ShutdownOnDBusThreadAndBlockInternal,
    490       this));
    491 
    492   // http://crbug.com/125222
    493   base::ThreadRestrictions::ScopedAllowWait allow_wait;
    494 
    495   // Wait until the shutdown is complete on the D-Bus thread.
    496   // The shutdown should not hang, but set timeout just in case.
    497   const int kTimeoutSecs = 3;
    498   const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
    499   const bool signaled = on_shutdown_.TimedWait(timeout);
    500   LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
    501 }
    502 
    503 void Bus::RequestOwnership(const std::string& service_name,
    504                            ServiceOwnershipOptions options,
    505                            OnOwnershipCallback on_ownership_callback) {
    506   AssertOnOriginThread();
    507 
    508   PostTaskToDBusThread(FROM_HERE, base::Bind(
    509       &Bus::RequestOwnershipInternal,
    510       this, service_name, options, on_ownership_callback));
    511 }
    512 
    513 void Bus::RequestOwnershipInternal(const std::string& service_name,
    514                                    ServiceOwnershipOptions options,
    515                                    OnOwnershipCallback on_ownership_callback) {
    516   AssertOnDBusThread();
    517 
    518   bool success = Connect();
    519   if (success)
    520     success = RequestOwnershipAndBlock(service_name, options);
    521 
    522   PostTaskToOriginThread(FROM_HERE,
    523                          base::Bind(on_ownership_callback,
    524                                     service_name,
    525                                     success));
    526 }
    527 
    528 bool Bus::RequestOwnershipAndBlock(const std::string& service_name,
    529                                    ServiceOwnershipOptions options) {
    530   DCHECK(connection_);
    531   // dbus_bus_request_name() is a blocking call.
    532   AssertOnDBusThread();
    533 
    534   // Check if we already own the service name.
    535   if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
    536     return true;
    537   }
    538 
    539   ScopedDBusError error;
    540   const int result = dbus_bus_request_name(connection_,
    541                                            service_name.c_str(),
    542                                            options,
    543                                            error.get());
    544   if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
    545     LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
    546                << (error.is_set() ? error.message() : "");
    547     return false;
    548   }
    549   owned_service_names_.insert(service_name);
    550   return true;
    551 }
    552 
    553 bool Bus::ReleaseOwnership(const std::string& service_name) {
    554   DCHECK(connection_);
    555   // dbus_bus_request_name() is a blocking call.
    556   AssertOnDBusThread();
    557 
    558   // Check if we already own the service name.
    559   std::set<std::string>::iterator found =
    560       owned_service_names_.find(service_name);
    561   if (found == owned_service_names_.end()) {
    562     LOG(ERROR) << service_name << " is not owned by the bus";
    563     return false;
    564   }
    565 
    566   ScopedDBusError error;
    567   const int result = dbus_bus_release_name(connection_, service_name.c_str(),
    568                                            error.get());
    569   if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
    570     owned_service_names_.erase(found);
    571     return true;
    572   } else {
    573     LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
    574                << (error.is_set() ? error.message() : "")
    575                << ", result code: " << result;
    576     return false;
    577   }
    578 }
    579 
    580 bool Bus::SetUpAsyncOperations() {
    581   DCHECK(connection_);
    582   AssertOnDBusThread();
    583 
    584   if (async_operations_set_up_)
    585     return true;
    586 
    587   // Process all the incoming data if any, so that OnDispatchStatus() will
    588   // be called when the incoming data is ready.
    589   ProcessAllIncomingDataIfAny();
    590 
    591   bool success = dbus_connection_set_watch_functions(connection_,
    592                                                      &Bus::OnAddWatchThunk,
    593                                                      &Bus::OnRemoveWatchThunk,
    594                                                      &Bus::OnToggleWatchThunk,
    595                                                      this,
    596                                                      NULL);
    597   CHECK(success) << "Unable to allocate memory";
    598 
    599   success = dbus_connection_set_timeout_functions(connection_,
    600                                                   &Bus::OnAddTimeoutThunk,
    601                                                   &Bus::OnRemoveTimeoutThunk,
    602                                                   &Bus::OnToggleTimeoutThunk,
    603                                                   this,
    604                                                   NULL);
    605   CHECK(success) << "Unable to allocate memory";
    606 
    607   dbus_connection_set_dispatch_status_function(
    608       connection_,
    609       &Bus::OnDispatchStatusChangedThunk,
    610       this,
    611       NULL);
    612 
    613   async_operations_set_up_ = true;
    614 
    615   return true;
    616 }
    617 
    618 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
    619                                         int timeout_ms,
    620                                         DBusError* error) {
    621   DCHECK(connection_);
    622   AssertOnDBusThread();
    623 
    624   return dbus_connection_send_with_reply_and_block(
    625       connection_, request, timeout_ms, error);
    626 }
    627 
    628 void Bus::SendWithReply(DBusMessage* request,
    629                         DBusPendingCall** pending_call,
    630                         int timeout_ms) {
    631   DCHECK(connection_);
    632   AssertOnDBusThread();
    633 
    634   const bool success = dbus_connection_send_with_reply(
    635       connection_, request, pending_call, timeout_ms);
    636   CHECK(success) << "Unable to allocate memory";
    637 }
    638 
    639 void Bus::Send(DBusMessage* request, uint32* serial) {
    640   DCHECK(connection_);
    641   AssertOnDBusThread();
    642 
    643   const bool success = dbus_connection_send(connection_, request, serial);
    644   CHECK(success) << "Unable to allocate memory";
    645 }
    646 
    647 bool Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
    648                             void* user_data) {
    649   DCHECK(connection_);
    650   AssertOnDBusThread();
    651 
    652   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
    653       std::make_pair(filter_function, user_data);
    654   if (filter_functions_added_.find(filter_data_pair) !=
    655       filter_functions_added_.end()) {
    656     VLOG(1) << "Filter function already exists: " << filter_function
    657             << " with associated data: " << user_data;
    658     return false;
    659   }
    660 
    661   const bool success = dbus_connection_add_filter(
    662       connection_, filter_function, user_data, NULL);
    663   CHECK(success) << "Unable to allocate memory";
    664   filter_functions_added_.insert(filter_data_pair);
    665   return true;
    666 }
    667 
    668 bool Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
    669                                void* user_data) {
    670   DCHECK(connection_);
    671   AssertOnDBusThread();
    672 
    673   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
    674       std::make_pair(filter_function, user_data);
    675   if (filter_functions_added_.find(filter_data_pair) ==
    676       filter_functions_added_.end()) {
    677     VLOG(1) << "Requested to remove an unknown filter function: "
    678             << filter_function
    679             << " with associated data: " << user_data;
    680     return false;
    681   }
    682 
    683   dbus_connection_remove_filter(connection_, filter_function, user_data);
    684   filter_functions_added_.erase(filter_data_pair);
    685   return true;
    686 }
    687 
    688 void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
    689   DCHECK(connection_);
    690   AssertOnDBusThread();
    691 
    692   std::map<std::string, int>::iterator iter =
    693       match_rules_added_.find(match_rule);
    694   if (iter != match_rules_added_.end()) {
    695     // The already existing rule's counter is incremented.
    696     iter->second++;
    697 
    698     VLOG(1) << "Match rule already exists: " << match_rule;
    699     return;
    700   }
    701 
    702   dbus_bus_add_match(connection_, match_rule.c_str(), error);
    703   match_rules_added_[match_rule] = 1;
    704 }
    705 
    706 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
    707   DCHECK(connection_);
    708   AssertOnDBusThread();
    709 
    710   std::map<std::string, int>::iterator iter =
    711       match_rules_added_.find(match_rule);
    712   if (iter == match_rules_added_.end()) {
    713     LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
    714     return false;
    715   }
    716 
    717   // The rule's counter is decremented and the rule is deleted when reachs 0.
    718   iter->second--;
    719   if (iter->second == 0) {
    720     dbus_bus_remove_match(connection_, match_rule.c_str(), error);
    721     match_rules_added_.erase(match_rule);
    722   }
    723   return true;
    724 }
    725 
    726 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
    727                                 const DBusObjectPathVTable* vtable,
    728                                 void* user_data,
    729                                 DBusError* error) {
    730   DCHECK(connection_);
    731   AssertOnDBusThread();
    732 
    733   if (registered_object_paths_.find(object_path) !=
    734       registered_object_paths_.end()) {
    735     LOG(ERROR) << "Object path already registered: " << object_path.value();
    736     return false;
    737   }
    738 
    739   const bool success = dbus_connection_try_register_object_path(
    740       connection_,
    741       object_path.value().c_str(),
    742       vtable,
    743       user_data,
    744       error);
    745   if (success)
    746     registered_object_paths_.insert(object_path);
    747   return success;
    748 }
    749 
    750 void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
    751   DCHECK(connection_);
    752   AssertOnDBusThread();
    753 
    754   if (registered_object_paths_.find(object_path) ==
    755       registered_object_paths_.end()) {
    756     LOG(ERROR) << "Requested to unregister an unknown object path: "
    757                << object_path.value();
    758     return;
    759   }
    760 
    761   const bool success = dbus_connection_unregister_object_path(
    762       connection_,
    763       object_path.value().c_str());
    764   CHECK(success) << "Unable to allocate memory";
    765   registered_object_paths_.erase(object_path);
    766 }
    767 
    768 void Bus::ShutdownOnDBusThreadAndBlockInternal() {
    769   AssertOnDBusThread();
    770 
    771   ShutdownAndBlock();
    772   on_shutdown_.Signal();
    773 }
    774 
    775 void Bus::ProcessAllIncomingDataIfAny() {
    776   AssertOnDBusThread();
    777 
    778   // As mentioned at the class comment in .h file, connection_ can be NULL.
    779   if (!connection_)
    780     return;
    781 
    782   // It is safe and necessary to call dbus_connection_get_dispatch_status even
    783   // if the connection is lost. Otherwise we will miss "Disconnected" signal.
    784   // (crbug.com/174431)
    785   if (dbus_connection_get_dispatch_status(connection_) ==
    786       DBUS_DISPATCH_DATA_REMAINS) {
    787     while (dbus_connection_dispatch(connection_) ==
    788            DBUS_DISPATCH_DATA_REMAINS) {
    789     }
    790   }
    791 }
    792 
    793 void Bus::PostTaskToDBusThreadAndReply(
    794     const tracked_objects::Location& from_here,
    795     const base::Closure& task,
    796     const base::Closure& reply) {
    797   AssertOnOriginThread();
    798 
    799   if (dbus_task_runner_.get()) {
    800     if (!dbus_task_runner_->PostTaskAndReply(from_here, task, reply)) {
    801       LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop";
    802     }
    803   } else {
    804     DCHECK(origin_task_runner_.get());
    805     if (!origin_task_runner_->PostTaskAndReply(from_here, task, reply)) {
    806       LOG(WARNING) << "Failed to post a task to the origin message loop";
    807     }
    808   }
    809 }
    810 
    811 void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here,
    812                                  const base::Closure& task) {
    813   DCHECK(origin_task_runner_.get());
    814   if (!origin_task_runner_->PostTask(from_here, task)) {
    815     LOG(WARNING) << "Failed to post a task to the origin message loop";
    816   }
    817 }
    818 
    819 void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here,
    820                                const base::Closure& task) {
    821   if (dbus_task_runner_.get()) {
    822     if (!dbus_task_runner_->PostTask(from_here, task)) {
    823       LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop";
    824     }
    825   } else {
    826     DCHECK(origin_task_runner_.get());
    827     if (!origin_task_runner_->PostTask(from_here, task)) {
    828       LOG(WARNING) << "Failed to post a task to the origin message loop";
    829     }
    830   }
    831 }
    832 
    833 void Bus::PostDelayedTaskToDBusThread(
    834     const tracked_objects::Location& from_here,
    835     const base::Closure& task,
    836     base::TimeDelta delay) {
    837   if (dbus_task_runner_.get()) {
    838     if (!dbus_task_runner_->PostDelayedTask(
    839             from_here, task, delay)) {
    840       LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop";
    841     }
    842   } else {
    843     DCHECK(origin_task_runner_.get());
    844     if (!origin_task_runner_->PostDelayedTask(from_here, task, delay)) {
    845       LOG(WARNING) << "Failed to post a task to the origin message loop";
    846     }
    847   }
    848 }
    849 
    850 bool Bus::HasDBusThread() {
    851   return dbus_task_runner_.get() != NULL;
    852 }
    853 
    854 void Bus::AssertOnOriginThread() {
    855   DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
    856 }
    857 
    858 void Bus::AssertOnDBusThread() {
    859   base::ThreadRestrictions::AssertIOAllowed();
    860 
    861   if (dbus_task_runner_.get()) {
    862     DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread());
    863   } else {
    864     AssertOnOriginThread();
    865   }
    866 }
    867 
    868 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name,
    869                                          GetServiceOwnerOption options) {
    870   AssertOnDBusThread();
    871 
    872   MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner");
    873   MessageWriter writer(&get_name_owner_call);
    874   writer.AppendString(service_name);
    875   VLOG(1) << "Method call: " << get_name_owner_call.ToString();
    876 
    877   const ObjectPath obj_path("/org/freedesktop/DBus");
    878   if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") ||
    879       !get_name_owner_call.SetPath(obj_path)) {
    880     if (options == REPORT_ERRORS)
    881       LOG(ERROR) << "Failed to get name owner.";
    882     return "";
    883   }
    884 
    885   ScopedDBusError error;
    886   DBusMessage* response_message =
    887       SendWithReplyAndBlock(get_name_owner_call.raw_message(),
    888                             ObjectProxy::TIMEOUT_USE_DEFAULT,
    889                             error.get());
    890   if (!response_message) {
    891     if (options == REPORT_ERRORS) {
    892       LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": "
    893                  << error.message();
    894     }
    895     return "";
    896   }
    897 
    898   scoped_ptr<Response> response(Response::FromRawMessage(response_message));
    899   MessageReader reader(response.get());
    900 
    901   std::string service_owner;
    902   if (!reader.PopString(&service_owner))
    903     service_owner.clear();
    904   return service_owner;
    905 }
    906 
    907 void Bus::GetServiceOwner(const std::string& service_name,
    908                           const GetServiceOwnerCallback& callback) {
    909   AssertOnOriginThread();
    910 
    911   PostTaskToDBusThread(
    912       FROM_HERE,
    913       base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback));
    914 }
    915 
    916 void Bus::GetServiceOwnerInternal(const std::string& service_name,
    917                                   const GetServiceOwnerCallback& callback) {
    918   AssertOnDBusThread();
    919 
    920   std::string service_owner;
    921   if (Connect())
    922     service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS);
    923   PostTaskToOriginThread(FROM_HERE, base::Bind(callback, service_owner));
    924 }
    925 
    926 void Bus::ListenForServiceOwnerChange(
    927     const std::string& service_name,
    928     const GetServiceOwnerCallback& callback) {
    929   AssertOnOriginThread();
    930   DCHECK(!service_name.empty());
    931   DCHECK(!callback.is_null());
    932 
    933   PostTaskToDBusThread(FROM_HERE,
    934                        base::Bind(&Bus::ListenForServiceOwnerChangeInternal,
    935                                   this, service_name, callback));
    936 }
    937 
    938 void Bus::ListenForServiceOwnerChangeInternal(
    939     const std::string& service_name,
    940     const GetServiceOwnerCallback& callback) {
    941   AssertOnDBusThread();
    942   DCHECK(!service_name.empty());
    943   DCHECK(!callback.is_null());
    944 
    945   if (!Connect() || !SetUpAsyncOperations())
    946     return;
    947 
    948   if (service_owner_changed_listener_map_.empty()) {
    949     bool filter_added =
    950         AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
    951     DCHECK(filter_added);
    952   }
    953 
    954   ServiceOwnerChangedListenerMap::iterator it =
    955       service_owner_changed_listener_map_.find(service_name);
    956   if (it == service_owner_changed_listener_map_.end()) {
    957     // Add a match rule for the new service name.
    958     const std::string name_owner_changed_match_rule =
    959         base::StringPrintf(kServiceNameOwnerChangeMatchRule,
    960                            service_name.c_str());
    961     ScopedDBusError error;
    962     AddMatch(name_owner_changed_match_rule, error.get());
    963     if (error.is_set()) {
    964       LOG(ERROR) << "Failed to add match rule for " << service_name
    965                  << ". Got " << error.name() << ": " << error.message();
    966       return;
    967     }
    968 
    969     service_owner_changed_listener_map_[service_name].push_back(callback);
    970     return;
    971   }
    972 
    973   // Check if the callback has already been added.
    974   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
    975   for (size_t i = 0; i < callbacks.size(); ++i) {
    976     if (callbacks[i].Equals(callback))
    977       return;
    978   }
    979   callbacks.push_back(callback);
    980 }
    981 
    982 void Bus::UnlistenForServiceOwnerChange(
    983     const std::string& service_name,
    984     const GetServiceOwnerCallback& callback) {
    985   AssertOnOriginThread();
    986   DCHECK(!service_name.empty());
    987   DCHECK(!callback.is_null());
    988 
    989   PostTaskToDBusThread(FROM_HERE,
    990                        base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal,
    991                                   this, service_name, callback));
    992 }
    993 
    994 void Bus::UnlistenForServiceOwnerChangeInternal(
    995     const std::string& service_name,
    996     const GetServiceOwnerCallback& callback) {
    997   AssertOnDBusThread();
    998   DCHECK(!service_name.empty());
    999   DCHECK(!callback.is_null());
   1000 
   1001   ServiceOwnerChangedListenerMap::iterator it =
   1002       service_owner_changed_listener_map_.find(service_name);
   1003   if (it == service_owner_changed_listener_map_.end())
   1004     return;
   1005 
   1006   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
   1007   for (size_t i = 0; i < callbacks.size(); ++i) {
   1008     if (callbacks[i].Equals(callback)) {
   1009       callbacks.erase(callbacks.begin() + i);
   1010       break;  // There can be only one.
   1011     }
   1012   }
   1013   if (!callbacks.empty())
   1014     return;
   1015 
   1016   // Last callback for |service_name| has been removed, remove match rule.
   1017   const std::string name_owner_changed_match_rule =
   1018       base::StringPrintf(kServiceNameOwnerChangeMatchRule,
   1019                          service_name.c_str());
   1020   ScopedDBusError error;
   1021   RemoveMatch(name_owner_changed_match_rule, error.get());
   1022   // And remove |service_owner_changed_listener_map_| entry.
   1023   service_owner_changed_listener_map_.erase(it);
   1024 
   1025   if (service_owner_changed_listener_map_.empty()) {
   1026     bool filter_removed =
   1027         RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
   1028     DCHECK(filter_removed);
   1029   }
   1030 }
   1031 
   1032 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
   1033   AssertOnDBusThread();
   1034 
   1035   // watch will be deleted when raw_watch is removed in OnRemoveWatch().
   1036   Watch* watch = new Watch(raw_watch);
   1037   if (watch->IsReadyToBeWatched()) {
   1038     watch->StartWatching();
   1039   }
   1040   ++num_pending_watches_;
   1041   return true;
   1042 }
   1043 
   1044 void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
   1045   AssertOnDBusThread();
   1046 
   1047   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
   1048   delete watch;
   1049   --num_pending_watches_;
   1050 }
   1051 
   1052 void Bus::OnToggleWatch(DBusWatch* raw_watch) {
   1053   AssertOnDBusThread();
   1054 
   1055   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
   1056   if (watch->IsReadyToBeWatched()) {
   1057     watch->StartWatching();
   1058   } else {
   1059     // It's safe to call this if StartWatching() wasn't called, per
   1060     // message_pump_libevent.h.
   1061     watch->StopWatching();
   1062   }
   1063 }
   1064 
   1065 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
   1066   AssertOnDBusThread();
   1067 
   1068   // timeout will be deleted when raw_timeout is removed in
   1069   // OnRemoveTimeoutThunk().
   1070   Timeout* timeout = new Timeout(raw_timeout);
   1071   if (timeout->IsReadyToBeMonitored()) {
   1072     timeout->StartMonitoring(this);
   1073   }
   1074   ++num_pending_timeouts_;
   1075   return true;
   1076 }
   1077 
   1078 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
   1079   AssertOnDBusThread();
   1080 
   1081   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
   1082   timeout->Complete();
   1083   --num_pending_timeouts_;
   1084 }
   1085 
   1086 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
   1087   AssertOnDBusThread();
   1088 
   1089   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
   1090   if (timeout->IsReadyToBeMonitored()) {
   1091     timeout->StartMonitoring(this);
   1092   } else {
   1093     timeout->StopMonitoring();
   1094   }
   1095 }
   1096 
   1097 void Bus::OnDispatchStatusChanged(DBusConnection* connection,
   1098                                   DBusDispatchStatus status) {
   1099   DCHECK_EQ(connection, connection_);
   1100   AssertOnDBusThread();
   1101 
   1102   // We cannot call ProcessAllIncomingDataIfAny() here, as calling
   1103   // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
   1104   // prohibited by the D-Bus library. Hence, we post a task here instead.
   1105   // See comments for dbus_connection_set_dispatch_status_function().
   1106   PostTaskToDBusThread(FROM_HERE,
   1107                        base::Bind(&Bus::ProcessAllIncomingDataIfAny,
   1108                                   this));
   1109 }
   1110 
   1111 void Bus::OnConnectionDisconnected(DBusConnection* connection) {
   1112   AssertOnDBusThread();
   1113 
   1114   if (!on_disconnected_closure_.is_null())
   1115     PostTaskToOriginThread(FROM_HERE, on_disconnected_closure_);
   1116 
   1117   if (!connection)
   1118     return;
   1119   DCHECK(!dbus_connection_get_is_connected(connection));
   1120 
   1121   ShutdownAndBlock();
   1122 }
   1123 
   1124 void Bus::OnServiceOwnerChanged(DBusMessage* message) {
   1125   DCHECK(message);
   1126   AssertOnDBusThread();
   1127 
   1128   // |message| will be unrefed on exit of the function. Increment the
   1129   // reference so we can use it in Signal::FromRawMessage() below.
   1130   dbus_message_ref(message);
   1131   scoped_ptr<Signal> signal(Signal::FromRawMessage(message));
   1132 
   1133   // Confirm the validity of the NameOwnerChanged signal.
   1134   if (signal->GetMember() != kNameOwnerChangedSignal ||
   1135       signal->GetInterface() != DBUS_INTERFACE_DBUS ||
   1136       signal->GetSender() != DBUS_SERVICE_DBUS) {
   1137     return;
   1138   }
   1139 
   1140   MessageReader reader(signal.get());
   1141   std::string service_name;
   1142   std::string old_owner;
   1143   std::string new_owner;
   1144   if (!reader.PopString(&service_name) ||
   1145       !reader.PopString(&old_owner) ||
   1146       !reader.PopString(&new_owner)) {
   1147     return;
   1148   }
   1149 
   1150   ServiceOwnerChangedListenerMap::const_iterator it =
   1151       service_owner_changed_listener_map_.find(service_name);
   1152   if (it == service_owner_changed_listener_map_.end())
   1153     return;
   1154 
   1155   const std::vector<GetServiceOwnerCallback>& callbacks = it->second;
   1156   for (size_t i = 0; i < callbacks.size(); ++i) {
   1157     PostTaskToOriginThread(FROM_HERE,
   1158                            base::Bind(callbacks[i], new_owner));
   1159   }
   1160 }
   1161 
   1162 // static
   1163 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
   1164   Bus* self = static_cast<Bus*>(data);
   1165   return self->OnAddWatch(raw_watch);
   1166 }
   1167 
   1168 // static
   1169 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
   1170   Bus* self = static_cast<Bus*>(data);
   1171   self->OnRemoveWatch(raw_watch);
   1172 }
   1173 
   1174 // static
   1175 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
   1176   Bus* self = static_cast<Bus*>(data);
   1177   self->OnToggleWatch(raw_watch);
   1178 }
   1179 
   1180 // static
   1181 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1182   Bus* self = static_cast<Bus*>(data);
   1183   return self->OnAddTimeout(raw_timeout);
   1184 }
   1185 
   1186 // static
   1187 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1188   Bus* self = static_cast<Bus*>(data);
   1189   self->OnRemoveTimeout(raw_timeout);
   1190 }
   1191 
   1192 // static
   1193 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
   1194   Bus* self = static_cast<Bus*>(data);
   1195   self->OnToggleTimeout(raw_timeout);
   1196 }
   1197 
   1198 // static
   1199 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
   1200                                        DBusDispatchStatus status,
   1201                                        void* data) {
   1202   Bus* self = static_cast<Bus*>(data);
   1203   self->OnDispatchStatusChanged(connection, status);
   1204 }
   1205 
   1206 // static
   1207 DBusHandlerResult Bus::OnConnectionDisconnectedFilter(
   1208     DBusConnection* connection,
   1209     DBusMessage* message,
   1210     void* data) {
   1211   if (dbus_message_is_signal(message,
   1212                              DBUS_INTERFACE_LOCAL,
   1213                              kDisconnectedSignal)) {
   1214     Bus* self = static_cast<Bus*>(data);
   1215     self->OnConnectionDisconnected(connection);
   1216     return DBUS_HANDLER_RESULT_HANDLED;
   1217   }
   1218   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
   1219 }
   1220 
   1221 // static
   1222 DBusHandlerResult Bus::OnServiceOwnerChangedFilter(
   1223     DBusConnection* connection,
   1224     DBusMessage* message,
   1225     void* data) {
   1226   if (dbus_message_is_signal(message,
   1227                              DBUS_INTERFACE_DBUS,
   1228                              kNameOwnerChangedSignal)) {
   1229     Bus* self = static_cast<Bus*>(data);
   1230     self->OnServiceOwnerChanged(message);
   1231   }
   1232   // Always return unhandled to let others, e.g. ObjectProxies, handle the same
   1233   // signal.
   1234   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
   1235 }
   1236 
   1237 }  // namespace dbus
   1238