1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "dbus/bus.h" 6 7 #include <stddef.h> 8 9 #include "base/bind.h" 10 #include "base/logging.h" 11 #include "base/message_loop/message_loop.h" 12 #include "base/stl_util.h" 13 #include "base/strings/stringprintf.h" 14 #include "base/threading/thread.h" 15 #include "base/threading/thread_restrictions.h" 16 #include "base/time/time.h" 17 #include "dbus/exported_object.h" 18 #include "dbus/message.h" 19 #include "dbus/object_manager.h" 20 #include "dbus/object_path.h" 21 #include "dbus/object_proxy.h" 22 #include "dbus/scoped_dbus_error.h" 23 24 namespace dbus { 25 26 namespace { 27 28 // The NameOwnerChanged member in org.freedesktop.DBus 29 const char kNameOwnerChangedSignal[] = "NameOwnerChanged"; 30 31 // The match rule used to filter for changes to a given service name owner. 32 const char kServiceNameOwnerChangeMatchRule[] = 33 "type='signal',interface='org.freedesktop.DBus'," 34 "member='NameOwnerChanged',path='/org/freedesktop/DBus'," 35 "sender='org.freedesktop.DBus',arg0='%s'"; 36 37 // The class is used for watching the file descriptor used for D-Bus 38 // communication. 39 class Watch : public base::MessagePumpLibevent::Watcher { 40 public: 41 explicit Watch(DBusWatch* watch) 42 : raw_watch_(watch) { 43 dbus_watch_set_data(raw_watch_, this, NULL); 44 } 45 46 ~Watch() override { dbus_watch_set_data(raw_watch_, NULL, NULL); } 47 48 // Returns true if the underlying file descriptor is ready to be watched. 49 bool IsReadyToBeWatched() { 50 return dbus_watch_get_enabled(raw_watch_); 51 } 52 53 // Starts watching the underlying file descriptor. 54 void StartWatching() { 55 const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_); 56 const int flags = dbus_watch_get_flags(raw_watch_); 57 58 base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ; 59 if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE)) 60 mode = base::MessageLoopForIO::WATCH_READ_WRITE; 61 else if (flags & DBUS_WATCH_READABLE) 62 mode = base::MessageLoopForIO::WATCH_READ; 63 else if (flags & DBUS_WATCH_WRITABLE) 64 mode = base::MessageLoopForIO::WATCH_WRITE; 65 else 66 NOTREACHED(); 67 68 const bool persistent = true; // Watch persistently. 69 const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor( 70 file_descriptor, persistent, mode, &file_descriptor_watcher_, this); 71 CHECK(success) << "Unable to allocate memory"; 72 } 73 74 // Stops watching the underlying file descriptor. 75 void StopWatching() { 76 file_descriptor_watcher_.StopWatchingFileDescriptor(); 77 } 78 79 private: 80 // Implement MessagePumpLibevent::Watcher. 81 void OnFileCanReadWithoutBlocking(int /* file_descriptor */) override { 82 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE); 83 CHECK(success) << "Unable to allocate memory"; 84 } 85 86 // Implement MessagePumpLibevent::Watcher. 87 void OnFileCanWriteWithoutBlocking(int /* file_descriptor */) override { 88 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE); 89 CHECK(success) << "Unable to allocate memory"; 90 } 91 92 DBusWatch* raw_watch_; 93 base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_; 94 }; 95 96 // The class is used for monitoring the timeout used for D-Bus method 97 // calls. 98 // 99 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of 100 // the object is is alive when HandleTimeout() is called. It's unlikely 101 // but it may be possible that HandleTimeout() is called after 102 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in 103 // Bus::OnRemoveTimeout(). 104 class Timeout : public base::RefCountedThreadSafe<Timeout> { 105 public: 106 explicit Timeout(DBusTimeout* timeout) 107 : raw_timeout_(timeout), 108 monitoring_is_active_(false), 109 is_completed(false) { 110 dbus_timeout_set_data(raw_timeout_, this, NULL); 111 AddRef(); // Balanced on Complete(). 112 } 113 114 // Returns true if the timeout is ready to be monitored. 115 bool IsReadyToBeMonitored() { 116 return dbus_timeout_get_enabled(raw_timeout_); 117 } 118 119 // Starts monitoring the timeout. 120 void StartMonitoring(Bus* bus) { 121 bus->GetDBusTaskRunner()->PostDelayedTask( 122 FROM_HERE, 123 base::Bind(&Timeout::HandleTimeout, this), 124 GetInterval()); 125 monitoring_is_active_ = true; 126 } 127 128 // Stops monitoring the timeout. 129 void StopMonitoring() { 130 // We cannot take back the delayed task we posted in 131 // StartMonitoring(), so we just mark the monitoring is inactive now. 132 monitoring_is_active_ = false; 133 } 134 135 // Returns the interval. 136 base::TimeDelta GetInterval() { 137 return base::TimeDelta::FromMilliseconds( 138 dbus_timeout_get_interval(raw_timeout_)); 139 } 140 141 // Cleans up the raw_timeout and marks that timeout is completed. 142 // See the class comment above for why we are doing this. 143 void Complete() { 144 dbus_timeout_set_data(raw_timeout_, NULL, NULL); 145 is_completed = true; 146 Release(); 147 } 148 149 private: 150 friend class base::RefCountedThreadSafe<Timeout>; 151 ~Timeout() { 152 } 153 154 // Handles the timeout. 155 void HandleTimeout() { 156 // If the timeout is marked completed, we should do nothing. This can 157 // occur if this function is called after Bus::OnRemoveTimeout(). 158 if (is_completed) 159 return; 160 // Skip if monitoring is canceled. 161 if (!monitoring_is_active_) 162 return; 163 164 const bool success = dbus_timeout_handle(raw_timeout_); 165 CHECK(success) << "Unable to allocate memory"; 166 } 167 168 DBusTimeout* raw_timeout_; 169 bool monitoring_is_active_; 170 bool is_completed; 171 }; 172 173 } // namespace 174 175 Bus::Options::Options() 176 : bus_type(SESSION), 177 connection_type(PRIVATE) { 178 } 179 180 Bus::Options::~Options() { 181 } 182 183 Bus::Bus(const Options& options) 184 : bus_type_(options.bus_type), 185 connection_type_(options.connection_type), 186 dbus_task_runner_(options.dbus_task_runner), 187 on_shutdown_(false /* manual_reset */, false /* initially_signaled */), 188 connection_(NULL), 189 origin_thread_id_(base::PlatformThread::CurrentId()), 190 async_operations_set_up_(false), 191 shutdown_completed_(false), 192 num_pending_watches_(0), 193 num_pending_timeouts_(0), 194 address_(options.address) { 195 // This is safe to call multiple times. 196 dbus_threads_init_default(); 197 // The origin message loop is unnecessary if the client uses synchronous 198 // functions only. 199 if (base::MessageLoop::current()) 200 origin_task_runner_ = base::MessageLoop::current()->task_runner(); 201 } 202 203 Bus::~Bus() { 204 DCHECK(!connection_); 205 DCHECK(owned_service_names_.empty()); 206 DCHECK(match_rules_added_.empty()); 207 DCHECK(filter_functions_added_.empty()); 208 DCHECK(registered_object_paths_.empty()); 209 DCHECK_EQ(0, num_pending_watches_); 210 // TODO(satorux): This check fails occasionally in browser_tests for tests 211 // that run very quickly. Perhaps something does not have time to clean up. 212 // Despite the check failing, the tests seem to run fine. crosbug.com/23416 213 // DCHECK_EQ(0, num_pending_timeouts_); 214 } 215 216 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name, 217 const ObjectPath& object_path) { 218 return GetObjectProxyWithOptions(service_name, object_path, 219 ObjectProxy::DEFAULT_OPTIONS); 220 } 221 222 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name, 223 const ObjectPath& object_path, 224 int options) { 225 AssertOnOriginThread(); 226 227 // Check if we already have the requested object proxy. 228 const ObjectProxyTable::key_type key(service_name + object_path.value(), 229 options); 230 ObjectProxyTable::iterator iter = object_proxy_table_.find(key); 231 if (iter != object_proxy_table_.end()) { 232 return iter->second.get(); 233 } 234 235 scoped_refptr<ObjectProxy> object_proxy = 236 new ObjectProxy(this, service_name, object_path, options); 237 object_proxy_table_[key] = object_proxy; 238 239 return object_proxy.get(); 240 } 241 242 bool Bus::RemoveObjectProxy(const std::string& service_name, 243 const ObjectPath& object_path, 244 const base::Closure& callback) { 245 return RemoveObjectProxyWithOptions(service_name, object_path, 246 ObjectProxy::DEFAULT_OPTIONS, 247 callback); 248 } 249 250 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name, 251 const ObjectPath& object_path, 252 int options, 253 const base::Closure& callback) { 254 AssertOnOriginThread(); 255 256 // Check if we have the requested object proxy. 257 const ObjectProxyTable::key_type key(service_name + object_path.value(), 258 options); 259 ObjectProxyTable::iterator iter = object_proxy_table_.find(key); 260 if (iter != object_proxy_table_.end()) { 261 scoped_refptr<ObjectProxy> object_proxy = iter->second; 262 object_proxy_table_.erase(iter); 263 // Object is present. Remove it now and Detach on the DBus thread. 264 GetDBusTaskRunner()->PostTask( 265 FROM_HERE, 266 base::Bind(&Bus::RemoveObjectProxyInternal, 267 this, object_proxy, callback)); 268 return true; 269 } 270 return false; 271 } 272 273 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy, 274 const base::Closure& callback) { 275 AssertOnDBusThread(); 276 277 object_proxy.get()->Detach(); 278 279 GetOriginTaskRunner()->PostTask(FROM_HERE, callback); 280 } 281 282 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) { 283 AssertOnOriginThread(); 284 285 // Check if we already have the requested exported object. 286 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 287 if (iter != exported_object_table_.end()) { 288 return iter->second.get(); 289 } 290 291 scoped_refptr<ExportedObject> exported_object = 292 new ExportedObject(this, object_path); 293 exported_object_table_[object_path] = exported_object; 294 295 return exported_object.get(); 296 } 297 298 void Bus::UnregisterExportedObject(const ObjectPath& object_path) { 299 AssertOnOriginThread(); 300 301 // Remove the registered object from the table first, to allow a new 302 // GetExportedObject() call to return a new object, rather than this one. 303 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 304 if (iter == exported_object_table_.end()) 305 return; 306 307 scoped_refptr<ExportedObject> exported_object = iter->second; 308 exported_object_table_.erase(iter); 309 310 // Post the task to perform the final unregistration to the D-Bus thread. 311 // Since the registration also happens on the D-Bus thread in 312 // TryRegisterObjectPath(), and the task runner we post to is a 313 // SequencedTaskRunner, there is a guarantee that this will happen before any 314 // future registration call. 315 GetDBusTaskRunner()->PostTask( 316 FROM_HERE, 317 base::Bind(&Bus::UnregisterExportedObjectInternal, 318 this, exported_object)); 319 } 320 321 void Bus::UnregisterExportedObjectInternal( 322 scoped_refptr<ExportedObject> exported_object) { 323 AssertOnDBusThread(); 324 325 exported_object->Unregister(); 326 } 327 328 ObjectManager* Bus::GetObjectManager(const std::string& service_name, 329 const ObjectPath& object_path) { 330 AssertOnOriginThread(); 331 332 // Check if we already have the requested object manager. 333 const ObjectManagerTable::key_type key(service_name + object_path.value()); 334 ObjectManagerTable::iterator iter = object_manager_table_.find(key); 335 if (iter != object_manager_table_.end()) { 336 return iter->second.get(); 337 } 338 339 scoped_refptr<ObjectManager> object_manager = 340 new ObjectManager(this, service_name, object_path); 341 object_manager_table_[key] = object_manager; 342 343 return object_manager.get(); 344 } 345 346 bool Bus::RemoveObjectManager(const std::string& service_name, 347 const ObjectPath& object_path, 348 const base::Closure& callback) { 349 AssertOnOriginThread(); 350 DCHECK(!callback.is_null()); 351 352 const ObjectManagerTable::key_type key(service_name + object_path.value()); 353 ObjectManagerTable::iterator iter = object_manager_table_.find(key); 354 if (iter == object_manager_table_.end()) 355 return false; 356 357 // ObjectManager is present. Remove it now and CleanUp on the DBus thread. 358 scoped_refptr<ObjectManager> object_manager = iter->second; 359 object_manager_table_.erase(iter); 360 361 GetDBusTaskRunner()->PostTask( 362 FROM_HERE, 363 base::Bind(&Bus::RemoveObjectManagerInternal, 364 this, object_manager, callback)); 365 366 return true; 367 } 368 369 void Bus::RemoveObjectManagerInternal( 370 scoped_refptr<dbus::ObjectManager> object_manager, 371 const base::Closure& callback) { 372 AssertOnDBusThread(); 373 DCHECK(object_manager.get()); 374 375 object_manager->CleanUp(); 376 377 // The ObjectManager has to be deleted on the origin thread since it was 378 // created there. 379 GetOriginTaskRunner()->PostTask( 380 FROM_HERE, 381 base::Bind(&Bus::RemoveObjectManagerInternalHelper, 382 this, object_manager, callback)); 383 } 384 385 void Bus::RemoveObjectManagerInternalHelper( 386 scoped_refptr<dbus::ObjectManager> object_manager, 387 const base::Closure& callback) { 388 AssertOnOriginThread(); 389 DCHECK(object_manager.get()); 390 391 // Release the object manager and run the callback. 392 object_manager = NULL; 393 callback.Run(); 394 } 395 396 void Bus::GetManagedObjects() { 397 for (ObjectManagerTable::iterator iter = object_manager_table_.begin(); 398 iter != object_manager_table_.end(); ++iter) { 399 iter->second->GetManagedObjects(); 400 } 401 } 402 403 bool Bus::Connect() { 404 // dbus_bus_get_private() and dbus_bus_get() are blocking calls. 405 AssertOnDBusThread(); 406 407 // Check if it's already initialized. 408 if (connection_) 409 return true; 410 411 ScopedDBusError error; 412 if (bus_type_ == CUSTOM_ADDRESS) { 413 if (connection_type_ == PRIVATE) { 414 connection_ = dbus_connection_open_private(address_.c_str(), error.get()); 415 } else { 416 connection_ = dbus_connection_open(address_.c_str(), error.get()); 417 } 418 } else { 419 const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_); 420 if (connection_type_ == PRIVATE) { 421 connection_ = dbus_bus_get_private(dbus_bus_type, error.get()); 422 } else { 423 connection_ = dbus_bus_get(dbus_bus_type, error.get()); 424 } 425 } 426 if (!connection_) { 427 LOG(ERROR) << "Failed to connect to the bus: " 428 << (error.is_set() ? error.message() : ""); 429 return false; 430 } 431 432 if (bus_type_ == CUSTOM_ADDRESS) { 433 // We should call dbus_bus_register here, otherwise unique name can not be 434 // acquired. According to dbus specification, it is responsible to call 435 // org.freedesktop.DBus.Hello method at the beging of bus connection to 436 // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is 437 // called internally. 438 if (!dbus_bus_register(connection_, error.get())) { 439 LOG(ERROR) << "Failed to register the bus component: " 440 << (error.is_set() ? error.message() : ""); 441 return false; 442 } 443 } 444 445 return true; 446 } 447 448 void Bus::ClosePrivateConnection() { 449 // dbus_connection_close is blocking call. 450 AssertOnDBusThread(); 451 DCHECK_EQ(PRIVATE, connection_type_) 452 << "non-private connection should not be closed"; 453 dbus_connection_close(connection_); 454 } 455 456 void Bus::ShutdownAndBlock() { 457 AssertOnDBusThread(); 458 459 if (shutdown_completed_) 460 return; // Already shutdowned, just return. 461 462 // Unregister the exported objects. 463 for (ExportedObjectTable::iterator iter = exported_object_table_.begin(); 464 iter != exported_object_table_.end(); ++iter) { 465 iter->second->Unregister(); 466 } 467 468 // Release all service names. 469 for (std::set<std::string>::iterator iter = owned_service_names_.begin(); 470 iter != owned_service_names_.end();) { 471 // This is a bit tricky but we should increment the iter here as 472 // ReleaseOwnership() may remove |service_name| from the set. 473 const std::string& service_name = *iter++; 474 ReleaseOwnership(service_name); 475 } 476 if (!owned_service_names_.empty()) { 477 LOG(ERROR) << "Failed to release all service names. # of services left: " 478 << owned_service_names_.size(); 479 } 480 481 // Detach from the remote objects. 482 for (ObjectProxyTable::iterator iter = object_proxy_table_.begin(); 483 iter != object_proxy_table_.end(); ++iter) { 484 iter->second->Detach(); 485 } 486 487 // Clean up the object managers. 488 for (ObjectManagerTable::iterator iter = object_manager_table_.begin(); 489 iter != object_manager_table_.end(); ++iter) { 490 iter->second->CleanUp(); 491 } 492 493 // Release object proxies and exported objects here. We should do this 494 // here rather than in the destructor to avoid memory leaks due to 495 // cyclic references. 496 object_proxy_table_.clear(); 497 exported_object_table_.clear(); 498 499 // Private connection should be closed. 500 if (connection_) { 501 // Remove Disconnected watcher. 502 ScopedDBusError error; 503 504 if (connection_type_ == PRIVATE) 505 ClosePrivateConnection(); 506 // dbus_connection_close() won't unref. 507 dbus_connection_unref(connection_); 508 } 509 510 connection_ = NULL; 511 shutdown_completed_ = true; 512 } 513 514 void Bus::ShutdownOnDBusThreadAndBlock() { 515 AssertOnOriginThread(); 516 DCHECK(dbus_task_runner_.get()); 517 518 GetDBusTaskRunner()->PostTask( 519 FROM_HERE, 520 base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this)); 521 522 // http://crbug.com/125222 523 base::ThreadRestrictions::ScopedAllowWait allow_wait; 524 525 // Wait until the shutdown is complete on the D-Bus thread. 526 // The shutdown should not hang, but set timeout just in case. 527 const int kTimeoutSecs = 3; 528 const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs)); 529 const bool signaled = on_shutdown_.TimedWait(timeout); 530 LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus"; 531 } 532 533 void Bus::RequestOwnership(const std::string& service_name, 534 ServiceOwnershipOptions options, 535 OnOwnershipCallback on_ownership_callback) { 536 AssertOnOriginThread(); 537 538 GetDBusTaskRunner()->PostTask( 539 FROM_HERE, 540 base::Bind(&Bus::RequestOwnershipInternal, 541 this, service_name, options, on_ownership_callback)); 542 } 543 544 void Bus::RequestOwnershipInternal(const std::string& service_name, 545 ServiceOwnershipOptions options, 546 OnOwnershipCallback on_ownership_callback) { 547 AssertOnDBusThread(); 548 549 bool success = Connect(); 550 if (success) 551 success = RequestOwnershipAndBlock(service_name, options); 552 553 GetOriginTaskRunner()->PostTask(FROM_HERE, 554 base::Bind(on_ownership_callback, 555 service_name, 556 success)); 557 } 558 559 bool Bus::RequestOwnershipAndBlock(const std::string& service_name, 560 ServiceOwnershipOptions options) { 561 DCHECK(connection_); 562 // dbus_bus_request_name() is a blocking call. 563 AssertOnDBusThread(); 564 565 // Check if we already own the service name. 566 if (owned_service_names_.find(service_name) != owned_service_names_.end()) { 567 return true; 568 } 569 570 ScopedDBusError error; 571 const int result = dbus_bus_request_name(connection_, 572 service_name.c_str(), 573 options, 574 error.get()); 575 if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) { 576 LOG(ERROR) << "Failed to get the ownership of " << service_name << ": " 577 << (error.is_set() ? error.message() : ""); 578 return false; 579 } 580 owned_service_names_.insert(service_name); 581 return true; 582 } 583 584 bool Bus::ReleaseOwnership(const std::string& service_name) { 585 DCHECK(connection_); 586 // dbus_bus_request_name() is a blocking call. 587 AssertOnDBusThread(); 588 589 // Check if we already own the service name. 590 std::set<std::string>::iterator found = 591 owned_service_names_.find(service_name); 592 if (found == owned_service_names_.end()) { 593 LOG(ERROR) << service_name << " is not owned by the bus"; 594 return false; 595 } 596 597 ScopedDBusError error; 598 const int result = dbus_bus_release_name(connection_, service_name.c_str(), 599 error.get()); 600 if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) { 601 owned_service_names_.erase(found); 602 return true; 603 } else { 604 LOG(ERROR) << "Failed to release the ownership of " << service_name << ": " 605 << (error.is_set() ? error.message() : "") 606 << ", result code: " << result; 607 return false; 608 } 609 } 610 611 bool Bus::SetUpAsyncOperations() { 612 DCHECK(connection_); 613 AssertOnDBusThread(); 614 615 if (async_operations_set_up_) 616 return true; 617 618 // Process all the incoming data if any, so that OnDispatchStatus() will 619 // be called when the incoming data is ready. 620 ProcessAllIncomingDataIfAny(); 621 622 bool success = dbus_connection_set_watch_functions(connection_, 623 &Bus::OnAddWatchThunk, 624 &Bus::OnRemoveWatchThunk, 625 &Bus::OnToggleWatchThunk, 626 this, 627 NULL); 628 CHECK(success) << "Unable to allocate memory"; 629 630 success = dbus_connection_set_timeout_functions(connection_, 631 &Bus::OnAddTimeoutThunk, 632 &Bus::OnRemoveTimeoutThunk, 633 &Bus::OnToggleTimeoutThunk, 634 this, 635 NULL); 636 CHECK(success) << "Unable to allocate memory"; 637 638 dbus_connection_set_dispatch_status_function( 639 connection_, 640 &Bus::OnDispatchStatusChangedThunk, 641 this, 642 NULL); 643 644 async_operations_set_up_ = true; 645 646 return true; 647 } 648 649 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request, 650 int timeout_ms, 651 DBusError* error) { 652 DCHECK(connection_); 653 AssertOnDBusThread(); 654 655 return dbus_connection_send_with_reply_and_block( 656 connection_, request, timeout_ms, error); 657 } 658 659 void Bus::SendWithReply(DBusMessage* request, 660 DBusPendingCall** pending_call, 661 int timeout_ms) { 662 DCHECK(connection_); 663 AssertOnDBusThread(); 664 665 const bool success = dbus_connection_send_with_reply( 666 connection_, request, pending_call, timeout_ms); 667 CHECK(success) << "Unable to allocate memory"; 668 } 669 670 void Bus::Send(DBusMessage* request, uint32_t* serial) { 671 DCHECK(connection_); 672 AssertOnDBusThread(); 673 674 const bool success = dbus_connection_send(connection_, request, serial); 675 CHECK(success) << "Unable to allocate memory"; 676 } 677 678 void Bus::AddFilterFunction(DBusHandleMessageFunction filter_function, 679 void* user_data) { 680 DCHECK(connection_); 681 AssertOnDBusThread(); 682 683 std::pair<DBusHandleMessageFunction, void*> filter_data_pair = 684 std::make_pair(filter_function, user_data); 685 if (filter_functions_added_.find(filter_data_pair) != 686 filter_functions_added_.end()) { 687 VLOG(1) << "Filter function already exists: " << filter_function 688 << " with associated data: " << user_data; 689 return; 690 } 691 692 const bool success = dbus_connection_add_filter( 693 connection_, filter_function, user_data, NULL); 694 CHECK(success) << "Unable to allocate memory"; 695 filter_functions_added_.insert(filter_data_pair); 696 } 697 698 void Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function, 699 void* user_data) { 700 DCHECK(connection_); 701 AssertOnDBusThread(); 702 703 std::pair<DBusHandleMessageFunction, void*> filter_data_pair = 704 std::make_pair(filter_function, user_data); 705 if (filter_functions_added_.find(filter_data_pair) == 706 filter_functions_added_.end()) { 707 VLOG(1) << "Requested to remove an unknown filter function: " 708 << filter_function 709 << " with associated data: " << user_data; 710 return; 711 } 712 713 dbus_connection_remove_filter(connection_, filter_function, user_data); 714 filter_functions_added_.erase(filter_data_pair); 715 } 716 717 void Bus::AddMatch(const std::string& match_rule, DBusError* error) { 718 DCHECK(connection_); 719 AssertOnDBusThread(); 720 721 std::map<std::string, int>::iterator iter = 722 match_rules_added_.find(match_rule); 723 if (iter != match_rules_added_.end()) { 724 // The already existing rule's counter is incremented. 725 iter->second++; 726 727 VLOG(1) << "Match rule already exists: " << match_rule; 728 return; 729 } 730 731 dbus_bus_add_match(connection_, match_rule.c_str(), error); 732 match_rules_added_[match_rule] = 1; 733 } 734 735 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) { 736 DCHECK(connection_); 737 AssertOnDBusThread(); 738 739 std::map<std::string, int>::iterator iter = 740 match_rules_added_.find(match_rule); 741 if (iter == match_rules_added_.end()) { 742 LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule; 743 return false; 744 } 745 746 // The rule's counter is decremented and the rule is deleted when reachs 0. 747 iter->second--; 748 if (iter->second == 0) { 749 dbus_bus_remove_match(connection_, match_rule.c_str(), error); 750 match_rules_added_.erase(match_rule); 751 } 752 return true; 753 } 754 755 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path, 756 const DBusObjectPathVTable* vtable, 757 void* user_data, 758 DBusError* error) { 759 DCHECK(connection_); 760 AssertOnDBusThread(); 761 762 if (registered_object_paths_.find(object_path) != 763 registered_object_paths_.end()) { 764 LOG(ERROR) << "Object path already registered: " << object_path.value(); 765 return false; 766 } 767 768 const bool success = dbus_connection_try_register_object_path( 769 connection_, 770 object_path.value().c_str(), 771 vtable, 772 user_data, 773 error); 774 if (success) 775 registered_object_paths_.insert(object_path); 776 return success; 777 } 778 779 void Bus::UnregisterObjectPath(const ObjectPath& object_path) { 780 DCHECK(connection_); 781 AssertOnDBusThread(); 782 783 if (registered_object_paths_.find(object_path) == 784 registered_object_paths_.end()) { 785 LOG(ERROR) << "Requested to unregister an unknown object path: " 786 << object_path.value(); 787 return; 788 } 789 790 const bool success = dbus_connection_unregister_object_path( 791 connection_, 792 object_path.value().c_str()); 793 CHECK(success) << "Unable to allocate memory"; 794 registered_object_paths_.erase(object_path); 795 } 796 797 void Bus::ShutdownOnDBusThreadAndBlockInternal() { 798 AssertOnDBusThread(); 799 800 ShutdownAndBlock(); 801 on_shutdown_.Signal(); 802 } 803 804 void Bus::ProcessAllIncomingDataIfAny() { 805 AssertOnDBusThread(); 806 807 // As mentioned at the class comment in .h file, connection_ can be NULL. 808 if (!connection_) 809 return; 810 811 // It is safe and necessary to call dbus_connection_get_dispatch_status even 812 // if the connection is lost. 813 if (dbus_connection_get_dispatch_status(connection_) == 814 DBUS_DISPATCH_DATA_REMAINS) { 815 while (dbus_connection_dispatch(connection_) == 816 DBUS_DISPATCH_DATA_REMAINS) { 817 } 818 } 819 } 820 821 base::TaskRunner* Bus::GetDBusTaskRunner() { 822 if (dbus_task_runner_.get()) 823 return dbus_task_runner_.get(); 824 else 825 return GetOriginTaskRunner(); 826 } 827 828 base::TaskRunner* Bus::GetOriginTaskRunner() { 829 DCHECK(origin_task_runner_.get()); 830 return origin_task_runner_.get(); 831 } 832 833 bool Bus::HasDBusThread() { 834 return dbus_task_runner_.get() != NULL; 835 } 836 837 void Bus::AssertOnOriginThread() { 838 DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId()); 839 } 840 841 void Bus::AssertOnDBusThread() { 842 base::ThreadRestrictions::AssertIOAllowed(); 843 844 if (dbus_task_runner_.get()) { 845 DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread()); 846 } else { 847 AssertOnOriginThread(); 848 } 849 } 850 851 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name, 852 GetServiceOwnerOption options) { 853 AssertOnDBusThread(); 854 855 MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner"); 856 MessageWriter writer(&get_name_owner_call); 857 writer.AppendString(service_name); 858 VLOG(1) << "Method call: " << get_name_owner_call.ToString(); 859 860 const ObjectPath obj_path("/org/freedesktop/DBus"); 861 if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") || 862 !get_name_owner_call.SetPath(obj_path)) { 863 if (options == REPORT_ERRORS) 864 LOG(ERROR) << "Failed to get name owner."; 865 return ""; 866 } 867 868 ScopedDBusError error; 869 DBusMessage* response_message = 870 SendWithReplyAndBlock(get_name_owner_call.raw_message(), 871 ObjectProxy::TIMEOUT_USE_DEFAULT, 872 error.get()); 873 if (!response_message) { 874 if (options == REPORT_ERRORS) { 875 LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": " 876 << error.message(); 877 } 878 return ""; 879 } 880 881 scoped_ptr<Response> response(Response::FromRawMessage(response_message)); 882 MessageReader reader(response.get()); 883 884 std::string service_owner; 885 if (!reader.PopString(&service_owner)) 886 service_owner.clear(); 887 return service_owner; 888 } 889 890 void Bus::GetServiceOwner(const std::string& service_name, 891 const GetServiceOwnerCallback& callback) { 892 AssertOnOriginThread(); 893 894 GetDBusTaskRunner()->PostTask( 895 FROM_HERE, 896 base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback)); 897 } 898 899 void Bus::GetServiceOwnerInternal(const std::string& service_name, 900 const GetServiceOwnerCallback& callback) { 901 AssertOnDBusThread(); 902 903 std::string service_owner; 904 if (Connect()) 905 service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS); 906 GetOriginTaskRunner()->PostTask(FROM_HERE, 907 base::Bind(callback, service_owner)); 908 } 909 910 void Bus::ListenForServiceOwnerChange( 911 const std::string& service_name, 912 const GetServiceOwnerCallback& callback) { 913 AssertOnOriginThread(); 914 DCHECK(!service_name.empty()); 915 DCHECK(!callback.is_null()); 916 917 GetDBusTaskRunner()->PostTask( 918 FROM_HERE, 919 base::Bind(&Bus::ListenForServiceOwnerChangeInternal, 920 this, service_name, callback)); 921 } 922 923 void Bus::ListenForServiceOwnerChangeInternal( 924 const std::string& service_name, 925 const GetServiceOwnerCallback& callback) { 926 AssertOnDBusThread(); 927 DCHECK(!service_name.empty()); 928 DCHECK(!callback.is_null()); 929 930 if (!Connect() || !SetUpAsyncOperations()) 931 return; 932 933 if (service_owner_changed_listener_map_.empty()) 934 AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this); 935 936 ServiceOwnerChangedListenerMap::iterator it = 937 service_owner_changed_listener_map_.find(service_name); 938 if (it == service_owner_changed_listener_map_.end()) { 939 // Add a match rule for the new service name. 940 const std::string name_owner_changed_match_rule = 941 base::StringPrintf(kServiceNameOwnerChangeMatchRule, 942 service_name.c_str()); 943 ScopedDBusError error; 944 AddMatch(name_owner_changed_match_rule, error.get()); 945 if (error.is_set()) { 946 LOG(ERROR) << "Failed to add match rule for " << service_name 947 << ". Got " << error.name() << ": " << error.message(); 948 return; 949 } 950 951 service_owner_changed_listener_map_[service_name].push_back(callback); 952 return; 953 } 954 955 // Check if the callback has already been added. 956 std::vector<GetServiceOwnerCallback>& callbacks = it->second; 957 for (size_t i = 0; i < callbacks.size(); ++i) { 958 if (callbacks[i].Equals(callback)) 959 return; 960 } 961 callbacks.push_back(callback); 962 } 963 964 void Bus::UnlistenForServiceOwnerChange( 965 const std::string& service_name, 966 const GetServiceOwnerCallback& callback) { 967 AssertOnOriginThread(); 968 DCHECK(!service_name.empty()); 969 DCHECK(!callback.is_null()); 970 971 GetDBusTaskRunner()->PostTask( 972 FROM_HERE, 973 base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal, 974 this, service_name, callback)); 975 } 976 977 void Bus::UnlistenForServiceOwnerChangeInternal( 978 const std::string& service_name, 979 const GetServiceOwnerCallback& callback) { 980 AssertOnDBusThread(); 981 DCHECK(!service_name.empty()); 982 DCHECK(!callback.is_null()); 983 984 ServiceOwnerChangedListenerMap::iterator it = 985 service_owner_changed_listener_map_.find(service_name); 986 if (it == service_owner_changed_listener_map_.end()) 987 return; 988 989 std::vector<GetServiceOwnerCallback>& callbacks = it->second; 990 for (size_t i = 0; i < callbacks.size(); ++i) { 991 if (callbacks[i].Equals(callback)) { 992 callbacks.erase(callbacks.begin() + i); 993 break; // There can be only one. 994 } 995 } 996 if (!callbacks.empty()) 997 return; 998 999 // Last callback for |service_name| has been removed, remove match rule. 1000 const std::string name_owner_changed_match_rule = 1001 base::StringPrintf(kServiceNameOwnerChangeMatchRule, 1002 service_name.c_str()); 1003 ScopedDBusError error; 1004 RemoveMatch(name_owner_changed_match_rule, error.get()); 1005 // And remove |service_owner_changed_listener_map_| entry. 1006 service_owner_changed_listener_map_.erase(it); 1007 1008 if (service_owner_changed_listener_map_.empty()) 1009 RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this); 1010 } 1011 1012 std::string Bus::GetConnectionName() { 1013 if (!connection_) 1014 return ""; 1015 return dbus_bus_get_unique_name(connection_); 1016 } 1017 1018 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) { 1019 AssertOnDBusThread(); 1020 1021 // watch will be deleted when raw_watch is removed in OnRemoveWatch(). 1022 Watch* watch = new Watch(raw_watch); 1023 if (watch->IsReadyToBeWatched()) { 1024 watch->StartWatching(); 1025 } 1026 ++num_pending_watches_; 1027 return true; 1028 } 1029 1030 void Bus::OnRemoveWatch(DBusWatch* raw_watch) { 1031 AssertOnDBusThread(); 1032 1033 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); 1034 delete watch; 1035 --num_pending_watches_; 1036 } 1037 1038 void Bus::OnToggleWatch(DBusWatch* raw_watch) { 1039 AssertOnDBusThread(); 1040 1041 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); 1042 if (watch->IsReadyToBeWatched()) { 1043 watch->StartWatching(); 1044 } else { 1045 // It's safe to call this if StartWatching() wasn't called, per 1046 // message_pump_libevent.h. 1047 watch->StopWatching(); 1048 } 1049 } 1050 1051 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) { 1052 AssertOnDBusThread(); 1053 1054 // timeout will be deleted when raw_timeout is removed in 1055 // OnRemoveTimeoutThunk(). 1056 Timeout* timeout = new Timeout(raw_timeout); 1057 if (timeout->IsReadyToBeMonitored()) { 1058 timeout->StartMonitoring(this); 1059 } 1060 ++num_pending_timeouts_; 1061 return true; 1062 } 1063 1064 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) { 1065 AssertOnDBusThread(); 1066 1067 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); 1068 timeout->Complete(); 1069 --num_pending_timeouts_; 1070 } 1071 1072 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) { 1073 AssertOnDBusThread(); 1074 1075 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); 1076 if (timeout->IsReadyToBeMonitored()) { 1077 timeout->StartMonitoring(this); 1078 } else { 1079 timeout->StopMonitoring(); 1080 } 1081 } 1082 1083 void Bus::OnDispatchStatusChanged(DBusConnection* connection, 1084 DBusDispatchStatus /* status */) { 1085 DCHECK_EQ(connection, connection_); 1086 AssertOnDBusThread(); 1087 1088 // We cannot call ProcessAllIncomingDataIfAny() here, as calling 1089 // dbus_connection_dispatch() inside DBusDispatchStatusFunction is 1090 // prohibited by the D-Bus library. Hence, we post a task here instead. 1091 // See comments for dbus_connection_set_dispatch_status_function(). 1092 GetDBusTaskRunner()->PostTask(FROM_HERE, 1093 base::Bind(&Bus::ProcessAllIncomingDataIfAny, 1094 this)); 1095 } 1096 1097 void Bus::OnServiceOwnerChanged(DBusMessage* message) { 1098 DCHECK(message); 1099 AssertOnDBusThread(); 1100 1101 // |message| will be unrefed on exit of the function. Increment the 1102 // reference so we can use it in Signal::FromRawMessage() below. 1103 dbus_message_ref(message); 1104 scoped_ptr<Signal> signal(Signal::FromRawMessage(message)); 1105 1106 // Confirm the validity of the NameOwnerChanged signal. 1107 if (signal->GetMember() != kNameOwnerChangedSignal || 1108 signal->GetInterface() != DBUS_INTERFACE_DBUS || 1109 signal->GetSender() != DBUS_SERVICE_DBUS) { 1110 return; 1111 } 1112 1113 MessageReader reader(signal.get()); 1114 std::string service_name; 1115 std::string old_owner; 1116 std::string new_owner; 1117 if (!reader.PopString(&service_name) || 1118 !reader.PopString(&old_owner) || 1119 !reader.PopString(&new_owner)) { 1120 return; 1121 } 1122 1123 ServiceOwnerChangedListenerMap::const_iterator it = 1124 service_owner_changed_listener_map_.find(service_name); 1125 if (it == service_owner_changed_listener_map_.end()) 1126 return; 1127 1128 const std::vector<GetServiceOwnerCallback>& callbacks = it->second; 1129 for (size_t i = 0; i < callbacks.size(); ++i) { 1130 GetOriginTaskRunner()->PostTask(FROM_HERE, 1131 base::Bind(callbacks[i], new_owner)); 1132 } 1133 } 1134 1135 // static 1136 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) { 1137 Bus* self = static_cast<Bus*>(data); 1138 return self->OnAddWatch(raw_watch); 1139 } 1140 1141 // static 1142 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) { 1143 Bus* self = static_cast<Bus*>(data); 1144 self->OnRemoveWatch(raw_watch); 1145 } 1146 1147 // static 1148 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) { 1149 Bus* self = static_cast<Bus*>(data); 1150 self->OnToggleWatch(raw_watch); 1151 } 1152 1153 // static 1154 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1155 Bus* self = static_cast<Bus*>(data); 1156 return self->OnAddTimeout(raw_timeout); 1157 } 1158 1159 // static 1160 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1161 Bus* self = static_cast<Bus*>(data); 1162 self->OnRemoveTimeout(raw_timeout); 1163 } 1164 1165 // static 1166 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1167 Bus* self = static_cast<Bus*>(data); 1168 self->OnToggleTimeout(raw_timeout); 1169 } 1170 1171 // static 1172 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection, 1173 DBusDispatchStatus status, 1174 void* data) { 1175 Bus* self = static_cast<Bus*>(data); 1176 self->OnDispatchStatusChanged(connection, status); 1177 } 1178 1179 // static 1180 DBusHandlerResult Bus::OnServiceOwnerChangedFilter( 1181 DBusConnection* /* connection */, 1182 DBusMessage* message, 1183 void* data) { 1184 if (dbus_message_is_signal(message, 1185 DBUS_INTERFACE_DBUS, 1186 kNameOwnerChangedSignal)) { 1187 Bus* self = static_cast<Bus*>(data); 1188 self->OnServiceOwnerChanged(message); 1189 } 1190 // Always return unhandled to let others, e.g. ObjectProxies, handle the same 1191 // signal. 1192 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; 1193 } 1194 1195 } // namespace dbus 1196