1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "dbus/bus.h" 6 7 #include "base/bind.h" 8 #include "base/logging.h" 9 #include "base/message_loop/message_loop.h" 10 #include "base/message_loop/message_loop_proxy.h" 11 #include "base/stl_util.h" 12 #include "base/strings/stringprintf.h" 13 #include "base/threading/thread.h" 14 #include "base/threading/thread_restrictions.h" 15 #include "base/time/time.h" 16 #include "dbus/exported_object.h" 17 #include "dbus/message.h" 18 #include "dbus/object_manager.h" 19 #include "dbus/object_path.h" 20 #include "dbus/object_proxy.h" 21 #include "dbus/scoped_dbus_error.h" 22 23 namespace dbus { 24 25 namespace { 26 27 const char kDisconnectedSignal[] = "Disconnected"; 28 const char kDisconnectedMatchRule[] = 29 "type='signal', path='/org/freedesktop/DBus/Local'," 30 "interface='org.freedesktop.DBus.Local', member='Disconnected'"; 31 32 // The NameOwnerChanged member in org.freedesktop.DBus 33 const char kNameOwnerChangedSignal[] = "NameOwnerChanged"; 34 35 // The match rule used to filter for changes to a given service name owner. 36 const char kServiceNameOwnerChangeMatchRule[] = 37 "type='signal',interface='org.freedesktop.DBus'," 38 "member='NameOwnerChanged',path='/org/freedesktop/DBus'," 39 "sender='org.freedesktop.DBus',arg0='%s'"; 40 41 // The class is used for watching the file descriptor used for D-Bus 42 // communication. 43 class Watch : public base::MessagePumpLibevent::Watcher { 44 public: 45 explicit Watch(DBusWatch* watch) 46 : raw_watch_(watch) { 47 dbus_watch_set_data(raw_watch_, this, NULL); 48 } 49 50 virtual ~Watch() { 51 dbus_watch_set_data(raw_watch_, NULL, NULL); 52 } 53 54 // Returns true if the underlying file descriptor is ready to be watched. 55 bool IsReadyToBeWatched() { 56 return dbus_watch_get_enabled(raw_watch_); 57 } 58 59 // Starts watching the underlying file descriptor. 60 void StartWatching() { 61 const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_); 62 const int flags = dbus_watch_get_flags(raw_watch_); 63 64 base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ; 65 if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE)) 66 mode = base::MessageLoopForIO::WATCH_READ_WRITE; 67 else if (flags & DBUS_WATCH_READABLE) 68 mode = base::MessageLoopForIO::WATCH_READ; 69 else if (flags & DBUS_WATCH_WRITABLE) 70 mode = base::MessageLoopForIO::WATCH_WRITE; 71 else 72 NOTREACHED(); 73 74 const bool persistent = true; // Watch persistently. 75 const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor( 76 file_descriptor, persistent, mode, &file_descriptor_watcher_, this); 77 CHECK(success) << "Unable to allocate memory"; 78 } 79 80 // Stops watching the underlying file descriptor. 81 void StopWatching() { 82 file_descriptor_watcher_.StopWatchingFileDescriptor(); 83 } 84 85 private: 86 // Implement MessagePumpLibevent::Watcher. 87 virtual void OnFileCanReadWithoutBlocking(int file_descriptor) OVERRIDE { 88 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE); 89 CHECK(success) << "Unable to allocate memory"; 90 } 91 92 // Implement MessagePumpLibevent::Watcher. 93 virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) OVERRIDE { 94 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE); 95 CHECK(success) << "Unable to allocate memory"; 96 } 97 98 DBusWatch* raw_watch_; 99 base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_; 100 }; 101 102 // The class is used for monitoring the timeout used for D-Bus method 103 // calls. 104 // 105 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of 106 // the object is is alive when HandleTimeout() is called. It's unlikely 107 // but it may be possible that HandleTimeout() is called after 108 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in 109 // Bus::OnRemoveTimeout(). 110 class Timeout : public base::RefCountedThreadSafe<Timeout> { 111 public: 112 explicit Timeout(DBusTimeout* timeout) 113 : raw_timeout_(timeout), 114 monitoring_is_active_(false), 115 is_completed(false) { 116 dbus_timeout_set_data(raw_timeout_, this, NULL); 117 AddRef(); // Balanced on Complete(). 118 } 119 120 // Returns true if the timeout is ready to be monitored. 121 bool IsReadyToBeMonitored() { 122 return dbus_timeout_get_enabled(raw_timeout_); 123 } 124 125 // Starts monitoring the timeout. 126 void StartMonitoring(Bus* bus) { 127 bus->GetDBusTaskRunner()->PostDelayedTask( 128 FROM_HERE, 129 base::Bind(&Timeout::HandleTimeout, this), 130 GetInterval()); 131 monitoring_is_active_ = true; 132 } 133 134 // Stops monitoring the timeout. 135 void StopMonitoring() { 136 // We cannot take back the delayed task we posted in 137 // StartMonitoring(), so we just mark the monitoring is inactive now. 138 monitoring_is_active_ = false; 139 } 140 141 // Returns the interval. 142 base::TimeDelta GetInterval() { 143 return base::TimeDelta::FromMilliseconds( 144 dbus_timeout_get_interval(raw_timeout_)); 145 } 146 147 // Cleans up the raw_timeout and marks that timeout is completed. 148 // See the class comment above for why we are doing this. 149 void Complete() { 150 dbus_timeout_set_data(raw_timeout_, NULL, NULL); 151 is_completed = true; 152 Release(); 153 } 154 155 private: 156 friend class base::RefCountedThreadSafe<Timeout>; 157 ~Timeout() { 158 } 159 160 // Handles the timeout. 161 void HandleTimeout() { 162 // If the timeout is marked completed, we should do nothing. This can 163 // occur if this function is called after Bus::OnRemoveTimeout(). 164 if (is_completed) 165 return; 166 // Skip if monitoring is canceled. 167 if (!monitoring_is_active_) 168 return; 169 170 const bool success = dbus_timeout_handle(raw_timeout_); 171 CHECK(success) << "Unable to allocate memory"; 172 } 173 174 DBusTimeout* raw_timeout_; 175 bool monitoring_is_active_; 176 bool is_completed; 177 }; 178 179 } // namespace 180 181 Bus::Options::Options() 182 : bus_type(SESSION), 183 connection_type(PRIVATE) { 184 } 185 186 Bus::Options::~Options() { 187 } 188 189 Bus::Bus(const Options& options) 190 : bus_type_(options.bus_type), 191 connection_type_(options.connection_type), 192 dbus_task_runner_(options.dbus_task_runner), 193 on_shutdown_(false /* manual_reset */, false /* initially_signaled */), 194 connection_(NULL), 195 origin_thread_id_(base::PlatformThread::CurrentId()), 196 async_operations_set_up_(false), 197 shutdown_completed_(false), 198 num_pending_watches_(0), 199 num_pending_timeouts_(0), 200 address_(options.address), 201 on_disconnected_closure_(options.disconnected_callback) { 202 // This is safe to call multiple times. 203 dbus_threads_init_default(); 204 // The origin message loop is unnecessary if the client uses synchronous 205 // functions only. 206 if (base::MessageLoop::current()) 207 origin_task_runner_ = base::MessageLoop::current()->message_loop_proxy(); 208 } 209 210 Bus::~Bus() { 211 DCHECK(!connection_); 212 DCHECK(owned_service_names_.empty()); 213 DCHECK(match_rules_added_.empty()); 214 DCHECK(filter_functions_added_.empty()); 215 DCHECK(registered_object_paths_.empty()); 216 DCHECK_EQ(0, num_pending_watches_); 217 // TODO(satorux): This check fails occasionally in browser_tests for tests 218 // that run very quickly. Perhaps something does not have time to clean up. 219 // Despite the check failing, the tests seem to run fine. crosbug.com/23416 220 // DCHECK_EQ(0, num_pending_timeouts_); 221 } 222 223 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name, 224 const ObjectPath& object_path) { 225 return GetObjectProxyWithOptions(service_name, object_path, 226 ObjectProxy::DEFAULT_OPTIONS); 227 } 228 229 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name, 230 const ObjectPath& object_path, 231 int options) { 232 AssertOnOriginThread(); 233 234 // Check if we already have the requested object proxy. 235 const ObjectProxyTable::key_type key(service_name + object_path.value(), 236 options); 237 ObjectProxyTable::iterator iter = object_proxy_table_.find(key); 238 if (iter != object_proxy_table_.end()) { 239 return iter->second.get(); 240 } 241 242 scoped_refptr<ObjectProxy> object_proxy = 243 new ObjectProxy(this, service_name, object_path, options); 244 object_proxy_table_[key] = object_proxy; 245 246 return object_proxy.get(); 247 } 248 249 bool Bus::RemoveObjectProxy(const std::string& service_name, 250 const ObjectPath& object_path, 251 const base::Closure& callback) { 252 return RemoveObjectProxyWithOptions(service_name, object_path, 253 ObjectProxy::DEFAULT_OPTIONS, 254 callback); 255 } 256 257 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name, 258 const ObjectPath& object_path, 259 int options, 260 const base::Closure& callback) { 261 AssertOnOriginThread(); 262 263 // Check if we have the requested object proxy. 264 const ObjectProxyTable::key_type key(service_name + object_path.value(), 265 options); 266 ObjectProxyTable::iterator iter = object_proxy_table_.find(key); 267 if (iter != object_proxy_table_.end()) { 268 scoped_refptr<ObjectProxy> object_proxy = iter->second; 269 object_proxy_table_.erase(iter); 270 // Object is present. Remove it now and Detach on the DBus thread. 271 GetDBusTaskRunner()->PostTask( 272 FROM_HERE, 273 base::Bind(&Bus::RemoveObjectProxyInternal, 274 this, object_proxy, callback)); 275 return true; 276 } 277 return false; 278 } 279 280 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy, 281 const base::Closure& callback) { 282 AssertOnDBusThread(); 283 284 object_proxy.get()->Detach(); 285 286 GetOriginTaskRunner()->PostTask(FROM_HERE, callback); 287 } 288 289 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) { 290 AssertOnOriginThread(); 291 292 // Check if we already have the requested exported object. 293 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 294 if (iter != exported_object_table_.end()) { 295 return iter->second.get(); 296 } 297 298 scoped_refptr<ExportedObject> exported_object = 299 new ExportedObject(this, object_path); 300 exported_object_table_[object_path] = exported_object; 301 302 return exported_object.get(); 303 } 304 305 void Bus::UnregisterExportedObject(const ObjectPath& object_path) { 306 AssertOnOriginThread(); 307 308 // Remove the registered object from the table first, to allow a new 309 // GetExportedObject() call to return a new object, rather than this one. 310 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 311 if (iter == exported_object_table_.end()) 312 return; 313 314 scoped_refptr<ExportedObject> exported_object = iter->second; 315 exported_object_table_.erase(iter); 316 317 // Post the task to perform the final unregistration to the D-Bus thread. 318 // Since the registration also happens on the D-Bus thread in 319 // TryRegisterObjectPath(), and the task runner we post to is a 320 // SequencedTaskRunner, there is a guarantee that this will happen before any 321 // future registration call. 322 GetDBusTaskRunner()->PostTask( 323 FROM_HERE, 324 base::Bind(&Bus::UnregisterExportedObjectInternal, 325 this, exported_object)); 326 } 327 328 void Bus::UnregisterExportedObjectInternal( 329 scoped_refptr<ExportedObject> exported_object) { 330 AssertOnDBusThread(); 331 332 exported_object->Unregister(); 333 } 334 335 ObjectManager* Bus::GetObjectManager(const std::string& service_name, 336 const ObjectPath& object_path) { 337 AssertOnOriginThread(); 338 339 // Check if we already have the requested object manager. 340 const ObjectManagerTable::key_type key(service_name + object_path.value()); 341 ObjectManagerTable::iterator iter = object_manager_table_.find(key); 342 if (iter != object_manager_table_.end()) { 343 return iter->second.get(); 344 } 345 346 scoped_refptr<ObjectManager> object_manager = 347 new ObjectManager(this, service_name, object_path); 348 object_manager_table_[key] = object_manager; 349 350 return object_manager.get(); 351 } 352 353 bool Bus::RemoveObjectManager(const std::string& service_name, 354 const ObjectPath& object_path, 355 const base::Closure& callback) { 356 AssertOnOriginThread(); 357 DCHECK(!callback.is_null()); 358 359 const ObjectManagerTable::key_type key(service_name + object_path.value()); 360 ObjectManagerTable::iterator iter = object_manager_table_.find(key); 361 if (iter == object_manager_table_.end()) 362 return false; 363 364 // ObjectManager is present. Remove it now and CleanUp on the DBus thread. 365 scoped_refptr<ObjectManager> object_manager = iter->second; 366 object_manager_table_.erase(iter); 367 368 GetDBusTaskRunner()->PostTask( 369 FROM_HERE, 370 base::Bind(&Bus::RemoveObjectManagerInternal, 371 this, object_manager, callback)); 372 373 return true; 374 } 375 376 void Bus::RemoveObjectManagerInternal( 377 scoped_refptr<dbus::ObjectManager> object_manager, 378 const base::Closure& callback) { 379 AssertOnDBusThread(); 380 DCHECK(object_manager.get()); 381 382 object_manager->CleanUp(); 383 384 // The ObjectManager has to be deleted on the origin thread since it was 385 // created there. 386 GetOriginTaskRunner()->PostTask( 387 FROM_HERE, 388 base::Bind(&Bus::RemoveObjectManagerInternalHelper, 389 this, object_manager, callback)); 390 } 391 392 void Bus::RemoveObjectManagerInternalHelper( 393 scoped_refptr<dbus::ObjectManager> object_manager, 394 const base::Closure& callback) { 395 AssertOnOriginThread(); 396 DCHECK(object_manager.get()); 397 398 // Release the object manager and run the callback. 399 object_manager = NULL; 400 callback.Run(); 401 } 402 403 void Bus::GetManagedObjects() { 404 for (ObjectManagerTable::iterator iter = object_manager_table_.begin(); 405 iter != object_manager_table_.end(); ++iter) { 406 iter->second->GetManagedObjects(); 407 } 408 } 409 410 bool Bus::Connect() { 411 // dbus_bus_get_private() and dbus_bus_get() are blocking calls. 412 AssertOnDBusThread(); 413 414 // Check if it's already initialized. 415 if (connection_) 416 return true; 417 418 ScopedDBusError error; 419 if (bus_type_ == CUSTOM_ADDRESS) { 420 if (connection_type_ == PRIVATE) { 421 connection_ = dbus_connection_open_private(address_.c_str(), error.get()); 422 } else { 423 connection_ = dbus_connection_open(address_.c_str(), error.get()); 424 } 425 } else { 426 const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_); 427 if (connection_type_ == PRIVATE) { 428 connection_ = dbus_bus_get_private(dbus_bus_type, error.get()); 429 } else { 430 connection_ = dbus_bus_get(dbus_bus_type, error.get()); 431 } 432 } 433 if (!connection_) { 434 LOG(ERROR) << "Failed to connect to the bus: " 435 << (error.is_set() ? error.message() : ""); 436 return false; 437 } 438 439 if (bus_type_ == CUSTOM_ADDRESS) { 440 // We should call dbus_bus_register here, otherwise unique name can not be 441 // acquired. According to dbus specification, it is responsible to call 442 // org.freedesktop.DBus.Hello method at the beging of bus connection to 443 // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is 444 // called internally. 445 if (!dbus_bus_register(connection_, error.get())) { 446 LOG(ERROR) << "Failed to register the bus component: " 447 << (error.is_set() ? error.message() : ""); 448 return false; 449 } 450 } 451 // We shouldn't exit on the disconnected signal. 452 dbus_connection_set_exit_on_disconnect(connection_, false); 453 454 // Watch Disconnected signal. 455 AddFilterFunction(Bus::OnConnectionDisconnectedFilter, this); 456 AddMatch(kDisconnectedMatchRule, error.get()); 457 458 return true; 459 } 460 461 void Bus::ClosePrivateConnection() { 462 // dbus_connection_close is blocking call. 463 AssertOnDBusThread(); 464 DCHECK_EQ(PRIVATE, connection_type_) 465 << "non-private connection should not be closed"; 466 dbus_connection_close(connection_); 467 } 468 469 void Bus::ShutdownAndBlock() { 470 AssertOnDBusThread(); 471 472 if (shutdown_completed_) 473 return; // Already shutdowned, just return. 474 475 // Unregister the exported objects. 476 for (ExportedObjectTable::iterator iter = exported_object_table_.begin(); 477 iter != exported_object_table_.end(); ++iter) { 478 iter->second->Unregister(); 479 } 480 481 // Release all service names. 482 for (std::set<std::string>::iterator iter = owned_service_names_.begin(); 483 iter != owned_service_names_.end();) { 484 // This is a bit tricky but we should increment the iter here as 485 // ReleaseOwnership() may remove |service_name| from the set. 486 const std::string& service_name = *iter++; 487 ReleaseOwnership(service_name); 488 } 489 if (!owned_service_names_.empty()) { 490 LOG(ERROR) << "Failed to release all service names. # of services left: " 491 << owned_service_names_.size(); 492 } 493 494 // Detach from the remote objects. 495 for (ObjectProxyTable::iterator iter = object_proxy_table_.begin(); 496 iter != object_proxy_table_.end(); ++iter) { 497 iter->second->Detach(); 498 } 499 500 // Clean up the object managers. 501 for (ObjectManagerTable::iterator iter = object_manager_table_.begin(); 502 iter != object_manager_table_.end(); ++iter) { 503 iter->second->CleanUp(); 504 } 505 506 // Release object proxies and exported objects here. We should do this 507 // here rather than in the destructor to avoid memory leaks due to 508 // cyclic references. 509 object_proxy_table_.clear(); 510 exported_object_table_.clear(); 511 512 // Private connection should be closed. 513 if (connection_) { 514 // Remove Disconnected watcher. 515 ScopedDBusError error; 516 RemoveFilterFunction(Bus::OnConnectionDisconnectedFilter, this); 517 RemoveMatch(kDisconnectedMatchRule, error.get()); 518 519 if (connection_type_ == PRIVATE) 520 ClosePrivateConnection(); 521 // dbus_connection_close() won't unref. 522 dbus_connection_unref(connection_); 523 } 524 525 connection_ = NULL; 526 shutdown_completed_ = true; 527 } 528 529 void Bus::ShutdownOnDBusThreadAndBlock() { 530 AssertOnOriginThread(); 531 DCHECK(dbus_task_runner_.get()); 532 533 GetDBusTaskRunner()->PostTask( 534 FROM_HERE, 535 base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this)); 536 537 // http://crbug.com/125222 538 base::ThreadRestrictions::ScopedAllowWait allow_wait; 539 540 // Wait until the shutdown is complete on the D-Bus thread. 541 // The shutdown should not hang, but set timeout just in case. 542 const int kTimeoutSecs = 3; 543 const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs)); 544 const bool signaled = on_shutdown_.TimedWait(timeout); 545 LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus"; 546 } 547 548 void Bus::RequestOwnership(const std::string& service_name, 549 ServiceOwnershipOptions options, 550 OnOwnershipCallback on_ownership_callback) { 551 AssertOnOriginThread(); 552 553 GetDBusTaskRunner()->PostTask( 554 FROM_HERE, 555 base::Bind(&Bus::RequestOwnershipInternal, 556 this, service_name, options, on_ownership_callback)); 557 } 558 559 void Bus::RequestOwnershipInternal(const std::string& service_name, 560 ServiceOwnershipOptions options, 561 OnOwnershipCallback on_ownership_callback) { 562 AssertOnDBusThread(); 563 564 bool success = Connect(); 565 if (success) 566 success = RequestOwnershipAndBlock(service_name, options); 567 568 GetOriginTaskRunner()->PostTask(FROM_HERE, 569 base::Bind(on_ownership_callback, 570 service_name, 571 success)); 572 } 573 574 bool Bus::RequestOwnershipAndBlock(const std::string& service_name, 575 ServiceOwnershipOptions options) { 576 DCHECK(connection_); 577 // dbus_bus_request_name() is a blocking call. 578 AssertOnDBusThread(); 579 580 // Check if we already own the service name. 581 if (owned_service_names_.find(service_name) != owned_service_names_.end()) { 582 return true; 583 } 584 585 ScopedDBusError error; 586 const int result = dbus_bus_request_name(connection_, 587 service_name.c_str(), 588 options, 589 error.get()); 590 if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) { 591 LOG(ERROR) << "Failed to get the ownership of " << service_name << ": " 592 << (error.is_set() ? error.message() : ""); 593 return false; 594 } 595 owned_service_names_.insert(service_name); 596 return true; 597 } 598 599 bool Bus::ReleaseOwnership(const std::string& service_name) { 600 DCHECK(connection_); 601 // dbus_bus_request_name() is a blocking call. 602 AssertOnDBusThread(); 603 604 // Check if we already own the service name. 605 std::set<std::string>::iterator found = 606 owned_service_names_.find(service_name); 607 if (found == owned_service_names_.end()) { 608 LOG(ERROR) << service_name << " is not owned by the bus"; 609 return false; 610 } 611 612 ScopedDBusError error; 613 const int result = dbus_bus_release_name(connection_, service_name.c_str(), 614 error.get()); 615 if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) { 616 owned_service_names_.erase(found); 617 return true; 618 } else { 619 LOG(ERROR) << "Failed to release the ownership of " << service_name << ": " 620 << (error.is_set() ? error.message() : "") 621 << ", result code: " << result; 622 return false; 623 } 624 } 625 626 bool Bus::SetUpAsyncOperations() { 627 DCHECK(connection_); 628 AssertOnDBusThread(); 629 630 if (async_operations_set_up_) 631 return true; 632 633 // Process all the incoming data if any, so that OnDispatchStatus() will 634 // be called when the incoming data is ready. 635 ProcessAllIncomingDataIfAny(); 636 637 bool success = dbus_connection_set_watch_functions(connection_, 638 &Bus::OnAddWatchThunk, 639 &Bus::OnRemoveWatchThunk, 640 &Bus::OnToggleWatchThunk, 641 this, 642 NULL); 643 CHECK(success) << "Unable to allocate memory"; 644 645 success = dbus_connection_set_timeout_functions(connection_, 646 &Bus::OnAddTimeoutThunk, 647 &Bus::OnRemoveTimeoutThunk, 648 &Bus::OnToggleTimeoutThunk, 649 this, 650 NULL); 651 CHECK(success) << "Unable to allocate memory"; 652 653 dbus_connection_set_dispatch_status_function( 654 connection_, 655 &Bus::OnDispatchStatusChangedThunk, 656 this, 657 NULL); 658 659 async_operations_set_up_ = true; 660 661 return true; 662 } 663 664 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request, 665 int timeout_ms, 666 DBusError* error) { 667 DCHECK(connection_); 668 AssertOnDBusThread(); 669 670 return dbus_connection_send_with_reply_and_block( 671 connection_, request, timeout_ms, error); 672 } 673 674 void Bus::SendWithReply(DBusMessage* request, 675 DBusPendingCall** pending_call, 676 int timeout_ms) { 677 DCHECK(connection_); 678 AssertOnDBusThread(); 679 680 const bool success = dbus_connection_send_with_reply( 681 connection_, request, pending_call, timeout_ms); 682 CHECK(success) << "Unable to allocate memory"; 683 } 684 685 void Bus::Send(DBusMessage* request, uint32* serial) { 686 DCHECK(connection_); 687 AssertOnDBusThread(); 688 689 const bool success = dbus_connection_send(connection_, request, serial); 690 CHECK(success) << "Unable to allocate memory"; 691 } 692 693 bool Bus::AddFilterFunction(DBusHandleMessageFunction filter_function, 694 void* user_data) { 695 DCHECK(connection_); 696 AssertOnDBusThread(); 697 698 std::pair<DBusHandleMessageFunction, void*> filter_data_pair = 699 std::make_pair(filter_function, user_data); 700 if (filter_functions_added_.find(filter_data_pair) != 701 filter_functions_added_.end()) { 702 VLOG(1) << "Filter function already exists: " << filter_function 703 << " with associated data: " << user_data; 704 return false; 705 } 706 707 const bool success = dbus_connection_add_filter( 708 connection_, filter_function, user_data, NULL); 709 CHECK(success) << "Unable to allocate memory"; 710 filter_functions_added_.insert(filter_data_pair); 711 return true; 712 } 713 714 bool Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function, 715 void* user_data) { 716 DCHECK(connection_); 717 AssertOnDBusThread(); 718 719 std::pair<DBusHandleMessageFunction, void*> filter_data_pair = 720 std::make_pair(filter_function, user_data); 721 if (filter_functions_added_.find(filter_data_pair) == 722 filter_functions_added_.end()) { 723 VLOG(1) << "Requested to remove an unknown filter function: " 724 << filter_function 725 << " with associated data: " << user_data; 726 return false; 727 } 728 729 dbus_connection_remove_filter(connection_, filter_function, user_data); 730 filter_functions_added_.erase(filter_data_pair); 731 return true; 732 } 733 734 void Bus::AddMatch(const std::string& match_rule, DBusError* error) { 735 DCHECK(connection_); 736 AssertOnDBusThread(); 737 738 std::map<std::string, int>::iterator iter = 739 match_rules_added_.find(match_rule); 740 if (iter != match_rules_added_.end()) { 741 // The already existing rule's counter is incremented. 742 iter->second++; 743 744 VLOG(1) << "Match rule already exists: " << match_rule; 745 return; 746 } 747 748 dbus_bus_add_match(connection_, match_rule.c_str(), error); 749 match_rules_added_[match_rule] = 1; 750 } 751 752 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) { 753 DCHECK(connection_); 754 AssertOnDBusThread(); 755 756 std::map<std::string, int>::iterator iter = 757 match_rules_added_.find(match_rule); 758 if (iter == match_rules_added_.end()) { 759 LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule; 760 return false; 761 } 762 763 // The rule's counter is decremented and the rule is deleted when reachs 0. 764 iter->second--; 765 if (iter->second == 0) { 766 dbus_bus_remove_match(connection_, match_rule.c_str(), error); 767 match_rules_added_.erase(match_rule); 768 } 769 return true; 770 } 771 772 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path, 773 const DBusObjectPathVTable* vtable, 774 void* user_data, 775 DBusError* error) { 776 DCHECK(connection_); 777 AssertOnDBusThread(); 778 779 if (registered_object_paths_.find(object_path) != 780 registered_object_paths_.end()) { 781 LOG(ERROR) << "Object path already registered: " << object_path.value(); 782 return false; 783 } 784 785 const bool success = dbus_connection_try_register_object_path( 786 connection_, 787 object_path.value().c_str(), 788 vtable, 789 user_data, 790 error); 791 if (success) 792 registered_object_paths_.insert(object_path); 793 return success; 794 } 795 796 void Bus::UnregisterObjectPath(const ObjectPath& object_path) { 797 DCHECK(connection_); 798 AssertOnDBusThread(); 799 800 if (registered_object_paths_.find(object_path) == 801 registered_object_paths_.end()) { 802 LOG(ERROR) << "Requested to unregister an unknown object path: " 803 << object_path.value(); 804 return; 805 } 806 807 const bool success = dbus_connection_unregister_object_path( 808 connection_, 809 object_path.value().c_str()); 810 CHECK(success) << "Unable to allocate memory"; 811 registered_object_paths_.erase(object_path); 812 } 813 814 void Bus::ShutdownOnDBusThreadAndBlockInternal() { 815 AssertOnDBusThread(); 816 817 ShutdownAndBlock(); 818 on_shutdown_.Signal(); 819 } 820 821 void Bus::ProcessAllIncomingDataIfAny() { 822 AssertOnDBusThread(); 823 824 // As mentioned at the class comment in .h file, connection_ can be NULL. 825 if (!connection_) 826 return; 827 828 // It is safe and necessary to call dbus_connection_get_dispatch_status even 829 // if the connection is lost. Otherwise we will miss "Disconnected" signal. 830 // (crbug.com/174431) 831 if (dbus_connection_get_dispatch_status(connection_) == 832 DBUS_DISPATCH_DATA_REMAINS) { 833 while (dbus_connection_dispatch(connection_) == 834 DBUS_DISPATCH_DATA_REMAINS) { 835 } 836 } 837 } 838 839 base::TaskRunner* Bus::GetDBusTaskRunner() { 840 if (dbus_task_runner_.get()) 841 return dbus_task_runner_.get(); 842 else 843 return GetOriginTaskRunner(); 844 } 845 846 base::TaskRunner* Bus::GetOriginTaskRunner() { 847 DCHECK(origin_task_runner_.get()); 848 return origin_task_runner_.get(); 849 } 850 851 bool Bus::HasDBusThread() { 852 return dbus_task_runner_.get() != NULL; 853 } 854 855 void Bus::AssertOnOriginThread() { 856 DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId()); 857 } 858 859 void Bus::AssertOnDBusThread() { 860 base::ThreadRestrictions::AssertIOAllowed(); 861 862 if (dbus_task_runner_.get()) { 863 DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread()); 864 } else { 865 AssertOnOriginThread(); 866 } 867 } 868 869 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name, 870 GetServiceOwnerOption options) { 871 AssertOnDBusThread(); 872 873 MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner"); 874 MessageWriter writer(&get_name_owner_call); 875 writer.AppendString(service_name); 876 VLOG(1) << "Method call: " << get_name_owner_call.ToString(); 877 878 const ObjectPath obj_path("/org/freedesktop/DBus"); 879 if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") || 880 !get_name_owner_call.SetPath(obj_path)) { 881 if (options == REPORT_ERRORS) 882 LOG(ERROR) << "Failed to get name owner."; 883 return ""; 884 } 885 886 ScopedDBusError error; 887 DBusMessage* response_message = 888 SendWithReplyAndBlock(get_name_owner_call.raw_message(), 889 ObjectProxy::TIMEOUT_USE_DEFAULT, 890 error.get()); 891 if (!response_message) { 892 if (options == REPORT_ERRORS) { 893 LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": " 894 << error.message(); 895 } 896 return ""; 897 } 898 899 scoped_ptr<Response> response(Response::FromRawMessage(response_message)); 900 MessageReader reader(response.get()); 901 902 std::string service_owner; 903 if (!reader.PopString(&service_owner)) 904 service_owner.clear(); 905 return service_owner; 906 } 907 908 void Bus::GetServiceOwner(const std::string& service_name, 909 const GetServiceOwnerCallback& callback) { 910 AssertOnOriginThread(); 911 912 GetDBusTaskRunner()->PostTask( 913 FROM_HERE, 914 base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback)); 915 } 916 917 void Bus::GetServiceOwnerInternal(const std::string& service_name, 918 const GetServiceOwnerCallback& callback) { 919 AssertOnDBusThread(); 920 921 std::string service_owner; 922 if (Connect()) 923 service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS); 924 GetOriginTaskRunner()->PostTask(FROM_HERE, 925 base::Bind(callback, service_owner)); 926 } 927 928 void Bus::ListenForServiceOwnerChange( 929 const std::string& service_name, 930 const GetServiceOwnerCallback& callback) { 931 AssertOnOriginThread(); 932 DCHECK(!service_name.empty()); 933 DCHECK(!callback.is_null()); 934 935 GetDBusTaskRunner()->PostTask( 936 FROM_HERE, 937 base::Bind(&Bus::ListenForServiceOwnerChangeInternal, 938 this, service_name, callback)); 939 } 940 941 void Bus::ListenForServiceOwnerChangeInternal( 942 const std::string& service_name, 943 const GetServiceOwnerCallback& callback) { 944 AssertOnDBusThread(); 945 DCHECK(!service_name.empty()); 946 DCHECK(!callback.is_null()); 947 948 if (!Connect() || !SetUpAsyncOperations()) 949 return; 950 951 if (service_owner_changed_listener_map_.empty()) { 952 bool filter_added = 953 AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this); 954 DCHECK(filter_added); 955 } 956 957 ServiceOwnerChangedListenerMap::iterator it = 958 service_owner_changed_listener_map_.find(service_name); 959 if (it == service_owner_changed_listener_map_.end()) { 960 // Add a match rule for the new service name. 961 const std::string name_owner_changed_match_rule = 962 base::StringPrintf(kServiceNameOwnerChangeMatchRule, 963 service_name.c_str()); 964 ScopedDBusError error; 965 AddMatch(name_owner_changed_match_rule, error.get()); 966 if (error.is_set()) { 967 LOG(ERROR) << "Failed to add match rule for " << service_name 968 << ". Got " << error.name() << ": " << error.message(); 969 return; 970 } 971 972 service_owner_changed_listener_map_[service_name].push_back(callback); 973 return; 974 } 975 976 // Check if the callback has already been added. 977 std::vector<GetServiceOwnerCallback>& callbacks = it->second; 978 for (size_t i = 0; i < callbacks.size(); ++i) { 979 if (callbacks[i].Equals(callback)) 980 return; 981 } 982 callbacks.push_back(callback); 983 } 984 985 void Bus::UnlistenForServiceOwnerChange( 986 const std::string& service_name, 987 const GetServiceOwnerCallback& callback) { 988 AssertOnOriginThread(); 989 DCHECK(!service_name.empty()); 990 DCHECK(!callback.is_null()); 991 992 GetDBusTaskRunner()->PostTask( 993 FROM_HERE, 994 base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal, 995 this, service_name, callback)); 996 } 997 998 void Bus::UnlistenForServiceOwnerChangeInternal( 999 const std::string& service_name, 1000 const GetServiceOwnerCallback& callback) { 1001 AssertOnDBusThread(); 1002 DCHECK(!service_name.empty()); 1003 DCHECK(!callback.is_null()); 1004 1005 ServiceOwnerChangedListenerMap::iterator it = 1006 service_owner_changed_listener_map_.find(service_name); 1007 if (it == service_owner_changed_listener_map_.end()) 1008 return; 1009 1010 std::vector<GetServiceOwnerCallback>& callbacks = it->second; 1011 for (size_t i = 0; i < callbacks.size(); ++i) { 1012 if (callbacks[i].Equals(callback)) { 1013 callbacks.erase(callbacks.begin() + i); 1014 break; // There can be only one. 1015 } 1016 } 1017 if (!callbacks.empty()) 1018 return; 1019 1020 // Last callback for |service_name| has been removed, remove match rule. 1021 const std::string name_owner_changed_match_rule = 1022 base::StringPrintf(kServiceNameOwnerChangeMatchRule, 1023 service_name.c_str()); 1024 ScopedDBusError error; 1025 RemoveMatch(name_owner_changed_match_rule, error.get()); 1026 // And remove |service_owner_changed_listener_map_| entry. 1027 service_owner_changed_listener_map_.erase(it); 1028 1029 if (service_owner_changed_listener_map_.empty()) { 1030 bool filter_removed = 1031 RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this); 1032 DCHECK(filter_removed); 1033 } 1034 } 1035 1036 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) { 1037 AssertOnDBusThread(); 1038 1039 // watch will be deleted when raw_watch is removed in OnRemoveWatch(). 1040 Watch* watch = new Watch(raw_watch); 1041 if (watch->IsReadyToBeWatched()) { 1042 watch->StartWatching(); 1043 } 1044 ++num_pending_watches_; 1045 return true; 1046 } 1047 1048 void Bus::OnRemoveWatch(DBusWatch* raw_watch) { 1049 AssertOnDBusThread(); 1050 1051 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); 1052 delete watch; 1053 --num_pending_watches_; 1054 } 1055 1056 void Bus::OnToggleWatch(DBusWatch* raw_watch) { 1057 AssertOnDBusThread(); 1058 1059 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); 1060 if (watch->IsReadyToBeWatched()) { 1061 watch->StartWatching(); 1062 } else { 1063 // It's safe to call this if StartWatching() wasn't called, per 1064 // message_pump_libevent.h. 1065 watch->StopWatching(); 1066 } 1067 } 1068 1069 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) { 1070 AssertOnDBusThread(); 1071 1072 // timeout will be deleted when raw_timeout is removed in 1073 // OnRemoveTimeoutThunk(). 1074 Timeout* timeout = new Timeout(raw_timeout); 1075 if (timeout->IsReadyToBeMonitored()) { 1076 timeout->StartMonitoring(this); 1077 } 1078 ++num_pending_timeouts_; 1079 return true; 1080 } 1081 1082 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) { 1083 AssertOnDBusThread(); 1084 1085 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); 1086 timeout->Complete(); 1087 --num_pending_timeouts_; 1088 } 1089 1090 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) { 1091 AssertOnDBusThread(); 1092 1093 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); 1094 if (timeout->IsReadyToBeMonitored()) { 1095 timeout->StartMonitoring(this); 1096 } else { 1097 timeout->StopMonitoring(); 1098 } 1099 } 1100 1101 void Bus::OnDispatchStatusChanged(DBusConnection* connection, 1102 DBusDispatchStatus status) { 1103 DCHECK_EQ(connection, connection_); 1104 AssertOnDBusThread(); 1105 1106 // We cannot call ProcessAllIncomingDataIfAny() here, as calling 1107 // dbus_connection_dispatch() inside DBusDispatchStatusFunction is 1108 // prohibited by the D-Bus library. Hence, we post a task here instead. 1109 // See comments for dbus_connection_set_dispatch_status_function(). 1110 GetDBusTaskRunner()->PostTask(FROM_HERE, 1111 base::Bind(&Bus::ProcessAllIncomingDataIfAny, 1112 this)); 1113 } 1114 1115 void Bus::OnConnectionDisconnected(DBusConnection* connection) { 1116 AssertOnDBusThread(); 1117 1118 if (!on_disconnected_closure_.is_null()) 1119 GetOriginTaskRunner()->PostTask(FROM_HERE, on_disconnected_closure_); 1120 1121 if (!connection) 1122 return; 1123 DCHECK(!dbus_connection_get_is_connected(connection)); 1124 1125 ShutdownAndBlock(); 1126 } 1127 1128 void Bus::OnServiceOwnerChanged(DBusMessage* message) { 1129 DCHECK(message); 1130 AssertOnDBusThread(); 1131 1132 // |message| will be unrefed on exit of the function. Increment the 1133 // reference so we can use it in Signal::FromRawMessage() below. 1134 dbus_message_ref(message); 1135 scoped_ptr<Signal> signal(Signal::FromRawMessage(message)); 1136 1137 // Confirm the validity of the NameOwnerChanged signal. 1138 if (signal->GetMember() != kNameOwnerChangedSignal || 1139 signal->GetInterface() != DBUS_INTERFACE_DBUS || 1140 signal->GetSender() != DBUS_SERVICE_DBUS) { 1141 return; 1142 } 1143 1144 MessageReader reader(signal.get()); 1145 std::string service_name; 1146 std::string old_owner; 1147 std::string new_owner; 1148 if (!reader.PopString(&service_name) || 1149 !reader.PopString(&old_owner) || 1150 !reader.PopString(&new_owner)) { 1151 return; 1152 } 1153 1154 ServiceOwnerChangedListenerMap::const_iterator it = 1155 service_owner_changed_listener_map_.find(service_name); 1156 if (it == service_owner_changed_listener_map_.end()) 1157 return; 1158 1159 const std::vector<GetServiceOwnerCallback>& callbacks = it->second; 1160 for (size_t i = 0; i < callbacks.size(); ++i) { 1161 GetOriginTaskRunner()->PostTask(FROM_HERE, 1162 base::Bind(callbacks[i], new_owner)); 1163 } 1164 } 1165 1166 // static 1167 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) { 1168 Bus* self = static_cast<Bus*>(data); 1169 return self->OnAddWatch(raw_watch); 1170 } 1171 1172 // static 1173 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) { 1174 Bus* self = static_cast<Bus*>(data); 1175 self->OnRemoveWatch(raw_watch); 1176 } 1177 1178 // static 1179 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) { 1180 Bus* self = static_cast<Bus*>(data); 1181 self->OnToggleWatch(raw_watch); 1182 } 1183 1184 // static 1185 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1186 Bus* self = static_cast<Bus*>(data); 1187 return self->OnAddTimeout(raw_timeout); 1188 } 1189 1190 // static 1191 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1192 Bus* self = static_cast<Bus*>(data); 1193 self->OnRemoveTimeout(raw_timeout); 1194 } 1195 1196 // static 1197 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1198 Bus* self = static_cast<Bus*>(data); 1199 self->OnToggleTimeout(raw_timeout); 1200 } 1201 1202 // static 1203 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection, 1204 DBusDispatchStatus status, 1205 void* data) { 1206 Bus* self = static_cast<Bus*>(data); 1207 self->OnDispatchStatusChanged(connection, status); 1208 } 1209 1210 // static 1211 DBusHandlerResult Bus::OnConnectionDisconnectedFilter( 1212 DBusConnection* connection, 1213 DBusMessage* message, 1214 void* data) { 1215 if (dbus_message_is_signal(message, 1216 DBUS_INTERFACE_LOCAL, 1217 kDisconnectedSignal)) { 1218 Bus* self = static_cast<Bus*>(data); 1219 self->OnConnectionDisconnected(connection); 1220 return DBUS_HANDLER_RESULT_HANDLED; 1221 } 1222 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; 1223 } 1224 1225 // static 1226 DBusHandlerResult Bus::OnServiceOwnerChangedFilter( 1227 DBusConnection* connection, 1228 DBusMessage* message, 1229 void* data) { 1230 if (dbus_message_is_signal(message, 1231 DBUS_INTERFACE_DBUS, 1232 kNameOwnerChangedSignal)) { 1233 Bus* self = static_cast<Bus*>(data); 1234 self->OnServiceOwnerChanged(message); 1235 } 1236 // Always return unhandled to let others, e.g. ObjectProxies, handle the same 1237 // signal. 1238 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; 1239 } 1240 1241 } // namespace dbus 1242