1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "dbus/bus.h" 6 7 #include <stddef.h> 8 9 #include "base/bind.h" 10 #include "base/logging.h" 11 #include "base/message_loop/message_loop.h" 12 #include "base/stl_util.h" 13 #include "base/strings/stringprintf.h" 14 #include "base/threading/thread.h" 15 #include "base/threading/thread_restrictions.h" 16 #include "base/threading/thread_task_runner_handle.h" 17 #include "base/time/time.h" 18 #include "dbus/exported_object.h" 19 #include "dbus/message.h" 20 #include "dbus/object_manager.h" 21 #include "dbus/object_path.h" 22 #include "dbus/object_proxy.h" 23 #include "dbus/scoped_dbus_error.h" 24 25 namespace dbus { 26 27 namespace { 28 29 // The NameOwnerChanged member in org.freedesktop.DBus 30 const char kNameOwnerChangedSignal[] = "NameOwnerChanged"; 31 32 // The match rule used to filter for changes to a given service name owner. 33 const char kServiceNameOwnerChangeMatchRule[] = 34 "type='signal',interface='org.freedesktop.DBus'," 35 "member='NameOwnerChanged',path='/org/freedesktop/DBus'," 36 "sender='org.freedesktop.DBus',arg0='%s'"; 37 38 // The class is used for watching the file descriptor used for D-Bus 39 // communication. 40 class Watch : public base::MessagePumpLibevent::Watcher { 41 public: 42 explicit Watch(DBusWatch* watch) 43 : raw_watch_(watch) { 44 dbus_watch_set_data(raw_watch_, this, NULL); 45 } 46 47 ~Watch() override { dbus_watch_set_data(raw_watch_, NULL, NULL); } 48 49 // Returns true if the underlying file descriptor is ready to be watched. 50 bool IsReadyToBeWatched() { 51 return dbus_watch_get_enabled(raw_watch_); 52 } 53 54 // Starts watching the underlying file descriptor. 55 void StartWatching() { 56 const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_); 57 const int flags = dbus_watch_get_flags(raw_watch_); 58 59 base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ; 60 if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE)) 61 mode = base::MessageLoopForIO::WATCH_READ_WRITE; 62 else if (flags & DBUS_WATCH_READABLE) 63 mode = base::MessageLoopForIO::WATCH_READ; 64 else if (flags & DBUS_WATCH_WRITABLE) 65 mode = base::MessageLoopForIO::WATCH_WRITE; 66 else 67 NOTREACHED(); 68 69 const bool persistent = true; // Watch persistently. 70 const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor( 71 file_descriptor, persistent, mode, &file_descriptor_watcher_, this); 72 CHECK(success) << "Unable to allocate memory"; 73 } 74 75 // Stops watching the underlying file descriptor. 76 void StopWatching() { 77 file_descriptor_watcher_.StopWatchingFileDescriptor(); 78 } 79 80 private: 81 // Implement MessagePumpLibevent::Watcher. 82 void OnFileCanReadWithoutBlocking(int /*file_descriptor*/) override { 83 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE); 84 CHECK(success) << "Unable to allocate memory"; 85 } 86 87 // Implement MessagePumpLibevent::Watcher. 88 void OnFileCanWriteWithoutBlocking(int /*file_descriptor*/) override { 89 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE); 90 CHECK(success) << "Unable to allocate memory"; 91 } 92 93 DBusWatch* raw_watch_; 94 base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_; 95 }; 96 97 // The class is used for monitoring the timeout used for D-Bus method 98 // calls. 99 // 100 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of 101 // the object is is alive when HandleTimeout() is called. It's unlikely 102 // but it may be possible that HandleTimeout() is called after 103 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in 104 // Bus::OnRemoveTimeout(). 105 class Timeout : public base::RefCountedThreadSafe<Timeout> { 106 public: 107 explicit Timeout(DBusTimeout* timeout) 108 : raw_timeout_(timeout), 109 monitoring_is_active_(false), 110 is_completed(false) { 111 dbus_timeout_set_data(raw_timeout_, this, NULL); 112 AddRef(); // Balanced on Complete(). 113 } 114 115 // Returns true if the timeout is ready to be monitored. 116 bool IsReadyToBeMonitored() { 117 return dbus_timeout_get_enabled(raw_timeout_); 118 } 119 120 // Starts monitoring the timeout. 121 void StartMonitoring(Bus* bus) { 122 bus->GetDBusTaskRunner()->PostDelayedTask( 123 FROM_HERE, 124 base::Bind(&Timeout::HandleTimeout, this), 125 GetInterval()); 126 monitoring_is_active_ = true; 127 } 128 129 // Stops monitoring the timeout. 130 void StopMonitoring() { 131 // We cannot take back the delayed task we posted in 132 // StartMonitoring(), so we just mark the monitoring is inactive now. 133 monitoring_is_active_ = false; 134 } 135 136 // Returns the interval. 137 base::TimeDelta GetInterval() { 138 return base::TimeDelta::FromMilliseconds( 139 dbus_timeout_get_interval(raw_timeout_)); 140 } 141 142 // Cleans up the raw_timeout and marks that timeout is completed. 143 // See the class comment above for why we are doing this. 144 void Complete() { 145 dbus_timeout_set_data(raw_timeout_, NULL, NULL); 146 is_completed = true; 147 Release(); 148 } 149 150 private: 151 friend class base::RefCountedThreadSafe<Timeout>; 152 ~Timeout() { 153 } 154 155 // Handles the timeout. 156 void HandleTimeout() { 157 // If the timeout is marked completed, we should do nothing. This can 158 // occur if this function is called after Bus::OnRemoveTimeout(). 159 if (is_completed) 160 return; 161 // Skip if monitoring is canceled. 162 if (!monitoring_is_active_) 163 return; 164 165 const bool success = dbus_timeout_handle(raw_timeout_); 166 CHECK(success) << "Unable to allocate memory"; 167 } 168 169 DBusTimeout* raw_timeout_; 170 bool monitoring_is_active_; 171 bool is_completed; 172 }; 173 174 } // namespace 175 176 Bus::Options::Options() 177 : bus_type(SESSION), 178 connection_type(PRIVATE) { 179 } 180 181 Bus::Options::~Options() { 182 } 183 184 Bus::Bus(const Options& options) 185 : bus_type_(options.bus_type), 186 connection_type_(options.connection_type), 187 dbus_task_runner_(options.dbus_task_runner), 188 on_shutdown_(base::WaitableEvent::ResetPolicy::AUTOMATIC, 189 base::WaitableEvent::InitialState::NOT_SIGNALED), 190 connection_(NULL), 191 origin_thread_id_(base::PlatformThread::CurrentId()), 192 async_operations_set_up_(false), 193 shutdown_completed_(false), 194 num_pending_watches_(0), 195 num_pending_timeouts_(0), 196 address_(options.address) { 197 // This is safe to call multiple times. 198 dbus_threads_init_default(); 199 // The origin message loop is unnecessary if the client uses synchronous 200 // functions only. 201 if (base::ThreadTaskRunnerHandle::IsSet()) 202 origin_task_runner_ = base::ThreadTaskRunnerHandle::Get(); 203 } 204 205 Bus::~Bus() { 206 DCHECK(!connection_); 207 DCHECK(owned_service_names_.empty()); 208 DCHECK(match_rules_added_.empty()); 209 DCHECK(filter_functions_added_.empty()); 210 DCHECK(registered_object_paths_.empty()); 211 DCHECK_EQ(0, num_pending_watches_); 212 // TODO(satorux): This check fails occasionally in browser_tests for tests 213 // that run very quickly. Perhaps something does not have time to clean up. 214 // Despite the check failing, the tests seem to run fine. crosbug.com/23416 215 // DCHECK_EQ(0, num_pending_timeouts_); 216 } 217 218 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name, 219 const ObjectPath& object_path) { 220 return GetObjectProxyWithOptions(service_name, object_path, 221 ObjectProxy::DEFAULT_OPTIONS); 222 } 223 224 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name, 225 const ObjectPath& object_path, 226 int options) { 227 AssertOnOriginThread(); 228 229 // Check if we already have the requested object proxy. 230 const ObjectProxyTable::key_type key(service_name + object_path.value(), 231 options); 232 ObjectProxyTable::iterator iter = object_proxy_table_.find(key); 233 if (iter != object_proxy_table_.end()) { 234 return iter->second.get(); 235 } 236 237 scoped_refptr<ObjectProxy> object_proxy = 238 new ObjectProxy(this, service_name, object_path, options); 239 object_proxy_table_[key] = object_proxy; 240 241 return object_proxy.get(); 242 } 243 244 bool Bus::RemoveObjectProxy(const std::string& service_name, 245 const ObjectPath& object_path, 246 const base::Closure& callback) { 247 return RemoveObjectProxyWithOptions(service_name, object_path, 248 ObjectProxy::DEFAULT_OPTIONS, 249 callback); 250 } 251 252 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name, 253 const ObjectPath& object_path, 254 int options, 255 const base::Closure& callback) { 256 AssertOnOriginThread(); 257 258 // Check if we have the requested object proxy. 259 const ObjectProxyTable::key_type key(service_name + object_path.value(), 260 options); 261 ObjectProxyTable::iterator iter = object_proxy_table_.find(key); 262 if (iter != object_proxy_table_.end()) { 263 scoped_refptr<ObjectProxy> object_proxy = iter->second; 264 object_proxy_table_.erase(iter); 265 // Object is present. Remove it now and Detach on the DBus thread. 266 GetDBusTaskRunner()->PostTask( 267 FROM_HERE, 268 base::Bind(&Bus::RemoveObjectProxyInternal, 269 this, object_proxy, callback)); 270 return true; 271 } 272 return false; 273 } 274 275 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy, 276 const base::Closure& callback) { 277 AssertOnDBusThread(); 278 279 object_proxy.get()->Detach(); 280 281 GetOriginTaskRunner()->PostTask(FROM_HERE, callback); 282 } 283 284 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) { 285 AssertOnOriginThread(); 286 287 // Check if we already have the requested exported object. 288 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 289 if (iter != exported_object_table_.end()) { 290 return iter->second.get(); 291 } 292 293 scoped_refptr<ExportedObject> exported_object = 294 new ExportedObject(this, object_path); 295 exported_object_table_[object_path] = exported_object; 296 297 return exported_object.get(); 298 } 299 300 void Bus::UnregisterExportedObject(const ObjectPath& object_path) { 301 AssertOnOriginThread(); 302 303 // Remove the registered object from the table first, to allow a new 304 // GetExportedObject() call to return a new object, rather than this one. 305 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 306 if (iter == exported_object_table_.end()) 307 return; 308 309 scoped_refptr<ExportedObject> exported_object = iter->second; 310 exported_object_table_.erase(iter); 311 312 // Post the task to perform the final unregistration to the D-Bus thread. 313 // Since the registration also happens on the D-Bus thread in 314 // TryRegisterObjectPath(), and the task runner we post to is a 315 // SequencedTaskRunner, there is a guarantee that this will happen before any 316 // future registration call. 317 GetDBusTaskRunner()->PostTask( 318 FROM_HERE, 319 base::Bind(&Bus::UnregisterExportedObjectInternal, 320 this, exported_object)); 321 } 322 323 void Bus::UnregisterExportedObjectInternal( 324 scoped_refptr<ExportedObject> exported_object) { 325 AssertOnDBusThread(); 326 327 exported_object->Unregister(); 328 } 329 330 ObjectManager* Bus::GetObjectManager(const std::string& service_name, 331 const ObjectPath& object_path) { 332 AssertOnOriginThread(); 333 334 // Check if we already have the requested object manager. 335 const ObjectManagerTable::key_type key(service_name + object_path.value()); 336 ObjectManagerTable::iterator iter = object_manager_table_.find(key); 337 if (iter != object_manager_table_.end()) { 338 return iter->second.get(); 339 } 340 341 scoped_refptr<ObjectManager> object_manager = 342 new ObjectManager(this, service_name, object_path); 343 object_manager_table_[key] = object_manager; 344 345 return object_manager.get(); 346 } 347 348 bool Bus::RemoveObjectManager(const std::string& service_name, 349 const ObjectPath& object_path, 350 const base::Closure& callback) { 351 AssertOnOriginThread(); 352 DCHECK(!callback.is_null()); 353 354 const ObjectManagerTable::key_type key(service_name + object_path.value()); 355 ObjectManagerTable::iterator iter = object_manager_table_.find(key); 356 if (iter == object_manager_table_.end()) 357 return false; 358 359 // ObjectManager is present. Remove it now and CleanUp on the DBus thread. 360 scoped_refptr<ObjectManager> object_manager = iter->second; 361 object_manager_table_.erase(iter); 362 363 GetDBusTaskRunner()->PostTask( 364 FROM_HERE, 365 base::Bind(&Bus::RemoveObjectManagerInternal, 366 this, object_manager, callback)); 367 368 return true; 369 } 370 371 void Bus::RemoveObjectManagerInternal( 372 scoped_refptr<dbus::ObjectManager> object_manager, 373 const base::Closure& callback) { 374 AssertOnDBusThread(); 375 DCHECK(object_manager.get()); 376 377 object_manager->CleanUp(); 378 379 // The ObjectManager has to be deleted on the origin thread since it was 380 // created there. 381 GetOriginTaskRunner()->PostTask( 382 FROM_HERE, 383 base::Bind(&Bus::RemoveObjectManagerInternalHelper, 384 this, object_manager, callback)); 385 } 386 387 void Bus::RemoveObjectManagerInternalHelper( 388 scoped_refptr<dbus::ObjectManager> object_manager, 389 const base::Closure& callback) { 390 AssertOnOriginThread(); 391 DCHECK(object_manager.get()); 392 393 // Release the object manager and run the callback. 394 object_manager = NULL; 395 callback.Run(); 396 } 397 398 void Bus::GetManagedObjects() { 399 for (ObjectManagerTable::iterator iter = object_manager_table_.begin(); 400 iter != object_manager_table_.end(); ++iter) { 401 iter->second->GetManagedObjects(); 402 } 403 } 404 405 bool Bus::Connect() { 406 // dbus_bus_get_private() and dbus_bus_get() are blocking calls. 407 AssertOnDBusThread(); 408 409 // Check if it's already initialized. 410 if (connection_) 411 return true; 412 413 ScopedDBusError error; 414 if (bus_type_ == CUSTOM_ADDRESS) { 415 if (connection_type_ == PRIVATE) { 416 connection_ = dbus_connection_open_private(address_.c_str(), error.get()); 417 } else { 418 connection_ = dbus_connection_open(address_.c_str(), error.get()); 419 } 420 } else { 421 const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_); 422 if (connection_type_ == PRIVATE) { 423 connection_ = dbus_bus_get_private(dbus_bus_type, error.get()); 424 } else { 425 connection_ = dbus_bus_get(dbus_bus_type, error.get()); 426 } 427 } 428 if (!connection_) { 429 LOG(ERROR) << "Failed to connect to the bus: " 430 << (error.is_set() ? error.message() : ""); 431 return false; 432 } 433 434 if (bus_type_ == CUSTOM_ADDRESS) { 435 // We should call dbus_bus_register here, otherwise unique name can not be 436 // acquired. According to dbus specification, it is responsible to call 437 // org.freedesktop.DBus.Hello method at the beging of bus connection to 438 // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is 439 // called internally. 440 if (!dbus_bus_register(connection_, error.get())) { 441 LOG(ERROR) << "Failed to register the bus component: " 442 << (error.is_set() ? error.message() : ""); 443 return false; 444 } 445 } 446 447 return true; 448 } 449 450 void Bus::ClosePrivateConnection() { 451 // dbus_connection_close is blocking call. 452 AssertOnDBusThread(); 453 DCHECK_EQ(PRIVATE, connection_type_) 454 << "non-private connection should not be closed"; 455 dbus_connection_close(connection_); 456 } 457 458 void Bus::ShutdownAndBlock() { 459 AssertOnDBusThread(); 460 461 if (shutdown_completed_) 462 return; // Already shutdowned, just return. 463 464 // Unregister the exported objects. 465 for (ExportedObjectTable::iterator iter = exported_object_table_.begin(); 466 iter != exported_object_table_.end(); ++iter) { 467 iter->second->Unregister(); 468 } 469 470 // Release all service names. 471 for (std::set<std::string>::iterator iter = owned_service_names_.begin(); 472 iter != owned_service_names_.end();) { 473 // This is a bit tricky but we should increment the iter here as 474 // ReleaseOwnership() may remove |service_name| from the set. 475 const std::string& service_name = *iter++; 476 ReleaseOwnership(service_name); 477 } 478 if (!owned_service_names_.empty()) { 479 LOG(ERROR) << "Failed to release all service names. # of services left: " 480 << owned_service_names_.size(); 481 } 482 483 // Detach from the remote objects. 484 for (ObjectProxyTable::iterator iter = object_proxy_table_.begin(); 485 iter != object_proxy_table_.end(); ++iter) { 486 iter->second->Detach(); 487 } 488 489 // Clean up the object managers. 490 for (ObjectManagerTable::iterator iter = object_manager_table_.begin(); 491 iter != object_manager_table_.end(); ++iter) { 492 iter->second->CleanUp(); 493 } 494 495 // Release object proxies and exported objects here. We should do this 496 // here rather than in the destructor to avoid memory leaks due to 497 // cyclic references. 498 object_proxy_table_.clear(); 499 exported_object_table_.clear(); 500 501 // Private connection should be closed. 502 if (connection_) { 503 // Remove Disconnected watcher. 504 ScopedDBusError error; 505 506 if (connection_type_ == PRIVATE) 507 ClosePrivateConnection(); 508 // dbus_connection_close() won't unref. 509 dbus_connection_unref(connection_); 510 } 511 512 connection_ = NULL; 513 shutdown_completed_ = true; 514 } 515 516 void Bus::ShutdownOnDBusThreadAndBlock() { 517 AssertOnOriginThread(); 518 DCHECK(dbus_task_runner_.get()); 519 520 GetDBusTaskRunner()->PostTask( 521 FROM_HERE, 522 base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this)); 523 524 // http://crbug.com/125222 525 base::ThreadRestrictions::ScopedAllowWait allow_wait; 526 527 // Wait until the shutdown is complete on the D-Bus thread. 528 // The shutdown should not hang, but set timeout just in case. 529 const int kTimeoutSecs = 3; 530 const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs)); 531 const bool signaled = on_shutdown_.TimedWait(timeout); 532 LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus"; 533 } 534 535 void Bus::RequestOwnership(const std::string& service_name, 536 ServiceOwnershipOptions options, 537 OnOwnershipCallback on_ownership_callback) { 538 AssertOnOriginThread(); 539 540 GetDBusTaskRunner()->PostTask( 541 FROM_HERE, 542 base::Bind(&Bus::RequestOwnershipInternal, 543 this, service_name, options, on_ownership_callback)); 544 } 545 546 void Bus::RequestOwnershipInternal(const std::string& service_name, 547 ServiceOwnershipOptions options, 548 OnOwnershipCallback on_ownership_callback) { 549 AssertOnDBusThread(); 550 551 bool success = Connect(); 552 if (success) 553 success = RequestOwnershipAndBlock(service_name, options); 554 555 GetOriginTaskRunner()->PostTask(FROM_HERE, 556 base::Bind(on_ownership_callback, 557 service_name, 558 success)); 559 } 560 561 bool Bus::RequestOwnershipAndBlock(const std::string& service_name, 562 ServiceOwnershipOptions options) { 563 DCHECK(connection_); 564 // dbus_bus_request_name() is a blocking call. 565 AssertOnDBusThread(); 566 567 // Check if we already own the service name. 568 if (owned_service_names_.find(service_name) != owned_service_names_.end()) { 569 return true; 570 } 571 572 ScopedDBusError error; 573 const int result = dbus_bus_request_name(connection_, 574 service_name.c_str(), 575 options, 576 error.get()); 577 if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) { 578 LOG(ERROR) << "Failed to get the ownership of " << service_name << ": " 579 << (error.is_set() ? error.message() : ""); 580 return false; 581 } 582 owned_service_names_.insert(service_name); 583 return true; 584 } 585 586 bool Bus::ReleaseOwnership(const std::string& service_name) { 587 DCHECK(connection_); 588 // dbus_bus_request_name() is a blocking call. 589 AssertOnDBusThread(); 590 591 // Check if we already own the service name. 592 std::set<std::string>::iterator found = 593 owned_service_names_.find(service_name); 594 if (found == owned_service_names_.end()) { 595 LOG(ERROR) << service_name << " is not owned by the bus"; 596 return false; 597 } 598 599 ScopedDBusError error; 600 const int result = dbus_bus_release_name(connection_, service_name.c_str(), 601 error.get()); 602 if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) { 603 owned_service_names_.erase(found); 604 return true; 605 } else { 606 LOG(ERROR) << "Failed to release the ownership of " << service_name << ": " 607 << (error.is_set() ? error.message() : "") 608 << ", result code: " << result; 609 return false; 610 } 611 } 612 613 bool Bus::SetUpAsyncOperations() { 614 DCHECK(connection_); 615 AssertOnDBusThread(); 616 617 if (async_operations_set_up_) 618 return true; 619 620 // Process all the incoming data if any, so that OnDispatchStatus() will 621 // be called when the incoming data is ready. 622 ProcessAllIncomingDataIfAny(); 623 624 bool success = dbus_connection_set_watch_functions(connection_, 625 &Bus::OnAddWatchThunk, 626 &Bus::OnRemoveWatchThunk, 627 &Bus::OnToggleWatchThunk, 628 this, 629 NULL); 630 CHECK(success) << "Unable to allocate memory"; 631 632 success = dbus_connection_set_timeout_functions(connection_, 633 &Bus::OnAddTimeoutThunk, 634 &Bus::OnRemoveTimeoutThunk, 635 &Bus::OnToggleTimeoutThunk, 636 this, 637 NULL); 638 CHECK(success) << "Unable to allocate memory"; 639 640 dbus_connection_set_dispatch_status_function( 641 connection_, 642 &Bus::OnDispatchStatusChangedThunk, 643 this, 644 NULL); 645 646 async_operations_set_up_ = true; 647 648 return true; 649 } 650 651 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request, 652 int timeout_ms, 653 DBusError* error) { 654 DCHECK(connection_); 655 AssertOnDBusThread(); 656 657 return dbus_connection_send_with_reply_and_block( 658 connection_, request, timeout_ms, error); 659 } 660 661 void Bus::SendWithReply(DBusMessage* request, 662 DBusPendingCall** pending_call, 663 int timeout_ms) { 664 DCHECK(connection_); 665 AssertOnDBusThread(); 666 667 const bool success = dbus_connection_send_with_reply( 668 connection_, request, pending_call, timeout_ms); 669 CHECK(success) << "Unable to allocate memory"; 670 } 671 672 void Bus::Send(DBusMessage* request, uint32_t* serial) { 673 DCHECK(connection_); 674 AssertOnDBusThread(); 675 676 const bool success = dbus_connection_send(connection_, request, serial); 677 CHECK(success) << "Unable to allocate memory"; 678 } 679 680 void Bus::AddFilterFunction(DBusHandleMessageFunction filter_function, 681 void* user_data) { 682 DCHECK(connection_); 683 AssertOnDBusThread(); 684 685 std::pair<DBusHandleMessageFunction, void*> filter_data_pair = 686 std::make_pair(filter_function, user_data); 687 if (filter_functions_added_.find(filter_data_pair) != 688 filter_functions_added_.end()) { 689 VLOG(1) << "Filter function already exists: " << filter_function 690 << " with associated data: " << user_data; 691 return; 692 } 693 694 const bool success = dbus_connection_add_filter( 695 connection_, filter_function, user_data, NULL); 696 CHECK(success) << "Unable to allocate memory"; 697 filter_functions_added_.insert(filter_data_pair); 698 } 699 700 void Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function, 701 void* user_data) { 702 DCHECK(connection_); 703 AssertOnDBusThread(); 704 705 std::pair<DBusHandleMessageFunction, void*> filter_data_pair = 706 std::make_pair(filter_function, user_data); 707 if (filter_functions_added_.find(filter_data_pair) == 708 filter_functions_added_.end()) { 709 VLOG(1) << "Requested to remove an unknown filter function: " 710 << filter_function 711 << " with associated data: " << user_data; 712 return; 713 } 714 715 dbus_connection_remove_filter(connection_, filter_function, user_data); 716 filter_functions_added_.erase(filter_data_pair); 717 } 718 719 void Bus::AddMatch(const std::string& match_rule, DBusError* error) { 720 DCHECK(connection_); 721 AssertOnDBusThread(); 722 723 std::map<std::string, int>::iterator iter = 724 match_rules_added_.find(match_rule); 725 if (iter != match_rules_added_.end()) { 726 // The already existing rule's counter is incremented. 727 iter->second++; 728 729 VLOG(1) << "Match rule already exists: " << match_rule; 730 return; 731 } 732 733 dbus_bus_add_match(connection_, match_rule.c_str(), error); 734 match_rules_added_[match_rule] = 1; 735 } 736 737 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) { 738 DCHECK(connection_); 739 AssertOnDBusThread(); 740 741 std::map<std::string, int>::iterator iter = 742 match_rules_added_.find(match_rule); 743 if (iter == match_rules_added_.end()) { 744 LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule; 745 return false; 746 } 747 748 // The rule's counter is decremented and the rule is deleted when reachs 0. 749 iter->second--; 750 if (iter->second == 0) { 751 dbus_bus_remove_match(connection_, match_rule.c_str(), error); 752 match_rules_added_.erase(match_rule); 753 } 754 return true; 755 } 756 757 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path, 758 const DBusObjectPathVTable* vtable, 759 void* user_data, 760 DBusError* error) { 761 DCHECK(connection_); 762 AssertOnDBusThread(); 763 764 if (registered_object_paths_.find(object_path) != 765 registered_object_paths_.end()) { 766 LOG(ERROR) << "Object path already registered: " << object_path.value(); 767 return false; 768 } 769 770 const bool success = dbus_connection_try_register_object_path( 771 connection_, 772 object_path.value().c_str(), 773 vtable, 774 user_data, 775 error); 776 if (success) 777 registered_object_paths_.insert(object_path); 778 return success; 779 } 780 781 void Bus::UnregisterObjectPath(const ObjectPath& object_path) { 782 DCHECK(connection_); 783 AssertOnDBusThread(); 784 785 if (registered_object_paths_.find(object_path) == 786 registered_object_paths_.end()) { 787 LOG(ERROR) << "Requested to unregister an unknown object path: " 788 << object_path.value(); 789 return; 790 } 791 792 const bool success = dbus_connection_unregister_object_path( 793 connection_, 794 object_path.value().c_str()); 795 CHECK(success) << "Unable to allocate memory"; 796 registered_object_paths_.erase(object_path); 797 } 798 799 void Bus::ShutdownOnDBusThreadAndBlockInternal() { 800 AssertOnDBusThread(); 801 802 ShutdownAndBlock(); 803 on_shutdown_.Signal(); 804 } 805 806 void Bus::ProcessAllIncomingDataIfAny() { 807 AssertOnDBusThread(); 808 809 // As mentioned at the class comment in .h file, connection_ can be NULL. 810 if (!connection_) 811 return; 812 813 // It is safe and necessary to call dbus_connection_get_dispatch_status even 814 // if the connection is lost. 815 if (dbus_connection_get_dispatch_status(connection_) == 816 DBUS_DISPATCH_DATA_REMAINS) { 817 while (dbus_connection_dispatch(connection_) == 818 DBUS_DISPATCH_DATA_REMAINS) { 819 } 820 } 821 } 822 823 base::TaskRunner* Bus::GetDBusTaskRunner() { 824 if (dbus_task_runner_.get()) 825 return dbus_task_runner_.get(); 826 else 827 return GetOriginTaskRunner(); 828 } 829 830 base::TaskRunner* Bus::GetOriginTaskRunner() { 831 DCHECK(origin_task_runner_.get()); 832 return origin_task_runner_.get(); 833 } 834 835 bool Bus::HasDBusThread() { 836 return dbus_task_runner_.get() != NULL; 837 } 838 839 void Bus::AssertOnOriginThread() { 840 DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId()); 841 } 842 843 void Bus::AssertOnDBusThread() { 844 base::ThreadRestrictions::AssertIOAllowed(); 845 846 if (dbus_task_runner_.get()) { 847 DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread()); 848 } else { 849 AssertOnOriginThread(); 850 } 851 } 852 853 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name, 854 GetServiceOwnerOption options) { 855 AssertOnDBusThread(); 856 857 MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner"); 858 MessageWriter writer(&get_name_owner_call); 859 writer.AppendString(service_name); 860 VLOG(1) << "Method call: " << get_name_owner_call.ToString(); 861 862 const ObjectPath obj_path("/org/freedesktop/DBus"); 863 if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") || 864 !get_name_owner_call.SetPath(obj_path)) { 865 if (options == REPORT_ERRORS) 866 LOG(ERROR) << "Failed to get name owner."; 867 return ""; 868 } 869 870 ScopedDBusError error; 871 DBusMessage* response_message = 872 SendWithReplyAndBlock(get_name_owner_call.raw_message(), 873 ObjectProxy::TIMEOUT_USE_DEFAULT, 874 error.get()); 875 if (!response_message) { 876 if (options == REPORT_ERRORS) { 877 LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": " 878 << error.message(); 879 } 880 return ""; 881 } 882 883 std::unique_ptr<Response> response( 884 Response::FromRawMessage(response_message)); 885 MessageReader reader(response.get()); 886 887 std::string service_owner; 888 if (!reader.PopString(&service_owner)) 889 service_owner.clear(); 890 return service_owner; 891 } 892 893 void Bus::GetServiceOwner(const std::string& service_name, 894 const GetServiceOwnerCallback& callback) { 895 AssertOnOriginThread(); 896 897 GetDBusTaskRunner()->PostTask( 898 FROM_HERE, 899 base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback)); 900 } 901 902 void Bus::GetServiceOwnerInternal(const std::string& service_name, 903 const GetServiceOwnerCallback& callback) { 904 AssertOnDBusThread(); 905 906 std::string service_owner; 907 if (Connect()) 908 service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS); 909 GetOriginTaskRunner()->PostTask(FROM_HERE, 910 base::Bind(callback, service_owner)); 911 } 912 913 void Bus::ListenForServiceOwnerChange( 914 const std::string& service_name, 915 const GetServiceOwnerCallback& callback) { 916 AssertOnOriginThread(); 917 DCHECK(!service_name.empty()); 918 DCHECK(!callback.is_null()); 919 920 GetDBusTaskRunner()->PostTask( 921 FROM_HERE, 922 base::Bind(&Bus::ListenForServiceOwnerChangeInternal, 923 this, service_name, callback)); 924 } 925 926 void Bus::ListenForServiceOwnerChangeInternal( 927 const std::string& service_name, 928 const GetServiceOwnerCallback& callback) { 929 AssertOnDBusThread(); 930 DCHECK(!service_name.empty()); 931 DCHECK(!callback.is_null()); 932 933 if (!Connect() || !SetUpAsyncOperations()) 934 return; 935 936 if (service_owner_changed_listener_map_.empty()) 937 AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this); 938 939 ServiceOwnerChangedListenerMap::iterator it = 940 service_owner_changed_listener_map_.find(service_name); 941 if (it == service_owner_changed_listener_map_.end()) { 942 // Add a match rule for the new service name. 943 const std::string name_owner_changed_match_rule = 944 base::StringPrintf(kServiceNameOwnerChangeMatchRule, 945 service_name.c_str()); 946 ScopedDBusError error; 947 AddMatch(name_owner_changed_match_rule, error.get()); 948 if (error.is_set()) { 949 LOG(ERROR) << "Failed to add match rule for " << service_name 950 << ". Got " << error.name() << ": " << error.message(); 951 return; 952 } 953 954 service_owner_changed_listener_map_[service_name].push_back(callback); 955 return; 956 } 957 958 // Check if the callback has already been added. 959 std::vector<GetServiceOwnerCallback>& callbacks = it->second; 960 for (size_t i = 0; i < callbacks.size(); ++i) { 961 if (callbacks[i].Equals(callback)) 962 return; 963 } 964 callbacks.push_back(callback); 965 } 966 967 void Bus::UnlistenForServiceOwnerChange( 968 const std::string& service_name, 969 const GetServiceOwnerCallback& callback) { 970 AssertOnOriginThread(); 971 DCHECK(!service_name.empty()); 972 DCHECK(!callback.is_null()); 973 974 GetDBusTaskRunner()->PostTask( 975 FROM_HERE, 976 base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal, 977 this, service_name, callback)); 978 } 979 980 void Bus::UnlistenForServiceOwnerChangeInternal( 981 const std::string& service_name, 982 const GetServiceOwnerCallback& callback) { 983 AssertOnDBusThread(); 984 DCHECK(!service_name.empty()); 985 DCHECK(!callback.is_null()); 986 987 ServiceOwnerChangedListenerMap::iterator it = 988 service_owner_changed_listener_map_.find(service_name); 989 if (it == service_owner_changed_listener_map_.end()) 990 return; 991 992 std::vector<GetServiceOwnerCallback>& callbacks = it->second; 993 for (size_t i = 0; i < callbacks.size(); ++i) { 994 if (callbacks[i].Equals(callback)) { 995 callbacks.erase(callbacks.begin() + i); 996 break; // There can be only one. 997 } 998 } 999 if (!callbacks.empty()) 1000 return; 1001 1002 // Last callback for |service_name| has been removed, remove match rule. 1003 const std::string name_owner_changed_match_rule = 1004 base::StringPrintf(kServiceNameOwnerChangeMatchRule, 1005 service_name.c_str()); 1006 ScopedDBusError error; 1007 RemoveMatch(name_owner_changed_match_rule, error.get()); 1008 // And remove |service_owner_changed_listener_map_| entry. 1009 service_owner_changed_listener_map_.erase(it); 1010 1011 if (service_owner_changed_listener_map_.empty()) 1012 RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this); 1013 } 1014 1015 std::string Bus::GetConnectionName() { 1016 if (!connection_) 1017 return ""; 1018 return dbus_bus_get_unique_name(connection_); 1019 } 1020 1021 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) { 1022 AssertOnDBusThread(); 1023 1024 // watch will be deleted when raw_watch is removed in OnRemoveWatch(). 1025 Watch* watch = new Watch(raw_watch); 1026 if (watch->IsReadyToBeWatched()) { 1027 watch->StartWatching(); 1028 } 1029 ++num_pending_watches_; 1030 return true; 1031 } 1032 1033 void Bus::OnRemoveWatch(DBusWatch* raw_watch) { 1034 AssertOnDBusThread(); 1035 1036 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); 1037 delete watch; 1038 --num_pending_watches_; 1039 } 1040 1041 void Bus::OnToggleWatch(DBusWatch* raw_watch) { 1042 AssertOnDBusThread(); 1043 1044 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); 1045 if (watch->IsReadyToBeWatched()) { 1046 watch->StartWatching(); 1047 } else { 1048 // It's safe to call this if StartWatching() wasn't called, per 1049 // message_pump_libevent.h. 1050 watch->StopWatching(); 1051 } 1052 } 1053 1054 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) { 1055 AssertOnDBusThread(); 1056 1057 // timeout will be deleted when raw_timeout is removed in 1058 // OnRemoveTimeoutThunk(). 1059 Timeout* timeout = new Timeout(raw_timeout); 1060 if (timeout->IsReadyToBeMonitored()) { 1061 timeout->StartMonitoring(this); 1062 } 1063 ++num_pending_timeouts_; 1064 return true; 1065 } 1066 1067 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) { 1068 AssertOnDBusThread(); 1069 1070 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); 1071 timeout->Complete(); 1072 --num_pending_timeouts_; 1073 } 1074 1075 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) { 1076 AssertOnDBusThread(); 1077 1078 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); 1079 if (timeout->IsReadyToBeMonitored()) { 1080 timeout->StartMonitoring(this); 1081 } else { 1082 timeout->StopMonitoring(); 1083 } 1084 } 1085 1086 void Bus::OnDispatchStatusChanged(DBusConnection* connection, 1087 DBusDispatchStatus /*status*/) { 1088 DCHECK_EQ(connection, connection_); 1089 AssertOnDBusThread(); 1090 1091 // We cannot call ProcessAllIncomingDataIfAny() here, as calling 1092 // dbus_connection_dispatch() inside DBusDispatchStatusFunction is 1093 // prohibited by the D-Bus library. Hence, we post a task here instead. 1094 // See comments for dbus_connection_set_dispatch_status_function(). 1095 GetDBusTaskRunner()->PostTask(FROM_HERE, 1096 base::Bind(&Bus::ProcessAllIncomingDataIfAny, 1097 this)); 1098 } 1099 1100 void Bus::OnServiceOwnerChanged(DBusMessage* message) { 1101 DCHECK(message); 1102 AssertOnDBusThread(); 1103 1104 // |message| will be unrefed on exit of the function. Increment the 1105 // reference so we can use it in Signal::FromRawMessage() below. 1106 dbus_message_ref(message); 1107 std::unique_ptr<Signal> signal(Signal::FromRawMessage(message)); 1108 1109 // Confirm the validity of the NameOwnerChanged signal. 1110 if (signal->GetMember() != kNameOwnerChangedSignal || 1111 signal->GetInterface() != DBUS_INTERFACE_DBUS || 1112 signal->GetSender() != DBUS_SERVICE_DBUS) { 1113 return; 1114 } 1115 1116 MessageReader reader(signal.get()); 1117 std::string service_name; 1118 std::string old_owner; 1119 std::string new_owner; 1120 if (!reader.PopString(&service_name) || 1121 !reader.PopString(&old_owner) || 1122 !reader.PopString(&new_owner)) { 1123 return; 1124 } 1125 1126 ServiceOwnerChangedListenerMap::const_iterator it = 1127 service_owner_changed_listener_map_.find(service_name); 1128 if (it == service_owner_changed_listener_map_.end()) 1129 return; 1130 1131 const std::vector<GetServiceOwnerCallback>& callbacks = it->second; 1132 for (size_t i = 0; i < callbacks.size(); ++i) { 1133 GetOriginTaskRunner()->PostTask(FROM_HERE, 1134 base::Bind(callbacks[i], new_owner)); 1135 } 1136 } 1137 1138 // static 1139 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) { 1140 Bus* self = static_cast<Bus*>(data); 1141 return self->OnAddWatch(raw_watch); 1142 } 1143 1144 // static 1145 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) { 1146 Bus* self = static_cast<Bus*>(data); 1147 self->OnRemoveWatch(raw_watch); 1148 } 1149 1150 // static 1151 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) { 1152 Bus* self = static_cast<Bus*>(data); 1153 self->OnToggleWatch(raw_watch); 1154 } 1155 1156 // static 1157 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1158 Bus* self = static_cast<Bus*>(data); 1159 return self->OnAddTimeout(raw_timeout); 1160 } 1161 1162 // static 1163 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1164 Bus* self = static_cast<Bus*>(data); 1165 self->OnRemoveTimeout(raw_timeout); 1166 } 1167 1168 // static 1169 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1170 Bus* self = static_cast<Bus*>(data); 1171 self->OnToggleTimeout(raw_timeout); 1172 } 1173 1174 // static 1175 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection, 1176 DBusDispatchStatus status, 1177 void* data) { 1178 Bus* self = static_cast<Bus*>(data); 1179 self->OnDispatchStatusChanged(connection, status); 1180 } 1181 1182 // static 1183 DBusHandlerResult Bus::OnServiceOwnerChangedFilter( 1184 DBusConnection* /*connection*/, 1185 DBusMessage* message, 1186 void* data) { 1187 if (dbus_message_is_signal(message, 1188 DBUS_INTERFACE_DBUS, 1189 kNameOwnerChangedSignal)) { 1190 Bus* self = static_cast<Bus*>(data); 1191 self->OnServiceOwnerChanged(message); 1192 } 1193 // Always return unhandled to let others, e.g. ObjectProxies, handle the same 1194 // signal. 1195 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; 1196 } 1197 1198 } // namespace dbus 1199