1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "dbus/bus.h" 6 7 #include "base/bind.h" 8 #include "base/logging.h" 9 #include "base/message_loop/message_loop.h" 10 #include "base/message_loop/message_loop_proxy.h" 11 #include "base/stl_util.h" 12 #include "base/strings/stringprintf.h" 13 #include "base/threading/thread.h" 14 #include "base/threading/thread_restrictions.h" 15 #include "base/time/time.h" 16 #include "dbus/exported_object.h" 17 #include "dbus/message.h" 18 #include "dbus/object_manager.h" 19 #include "dbus/object_path.h" 20 #include "dbus/object_proxy.h" 21 #include "dbus/scoped_dbus_error.h" 22 23 namespace dbus { 24 25 namespace { 26 27 const char kDisconnectedSignal[] = "Disconnected"; 28 const char kDisconnectedMatchRule[] = 29 "type='signal', path='/org/freedesktop/DBus/Local'," 30 "interface='org.freedesktop.DBus.Local', member='Disconnected'"; 31 32 // The NameOwnerChanged member in org.freedesktop.DBus 33 const char kNameOwnerChangedSignal[] = "NameOwnerChanged"; 34 35 // The match rule used to filter for changes to a given service name owner. 36 const char kServiceNameOwnerChangeMatchRule[] = 37 "type='signal',interface='org.freedesktop.DBus'," 38 "member='NameOwnerChanged',path='/org/freedesktop/DBus'," 39 "sender='org.freedesktop.DBus',arg0='%s'"; 40 41 // The class is used for watching the file descriptor used for D-Bus 42 // communication. 43 class Watch : public base::MessagePumpLibevent::Watcher { 44 public: 45 explicit Watch(DBusWatch* watch) 46 : raw_watch_(watch) { 47 dbus_watch_set_data(raw_watch_, this, NULL); 48 } 49 50 virtual ~Watch() { 51 dbus_watch_set_data(raw_watch_, NULL, NULL); 52 } 53 54 // Returns true if the underlying file descriptor is ready to be watched. 55 bool IsReadyToBeWatched() { 56 return dbus_watch_get_enabled(raw_watch_); 57 } 58 59 // Starts watching the underlying file descriptor. 60 void StartWatching() { 61 const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_); 62 const int flags = dbus_watch_get_flags(raw_watch_); 63 64 base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ; 65 if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE)) 66 mode = base::MessageLoopForIO::WATCH_READ_WRITE; 67 else if (flags & DBUS_WATCH_READABLE) 68 mode = base::MessageLoopForIO::WATCH_READ; 69 else if (flags & DBUS_WATCH_WRITABLE) 70 mode = base::MessageLoopForIO::WATCH_WRITE; 71 else 72 NOTREACHED(); 73 74 const bool persistent = true; // Watch persistently. 75 const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor( 76 file_descriptor, persistent, mode, &file_descriptor_watcher_, this); 77 CHECK(success) << "Unable to allocate memory"; 78 } 79 80 // Stops watching the underlying file descriptor. 81 void StopWatching() { 82 file_descriptor_watcher_.StopWatchingFileDescriptor(); 83 } 84 85 private: 86 // Implement MessagePumpLibevent::Watcher. 87 virtual void OnFileCanReadWithoutBlocking(int file_descriptor) OVERRIDE { 88 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE); 89 CHECK(success) << "Unable to allocate memory"; 90 } 91 92 // Implement MessagePumpLibevent::Watcher. 93 virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) OVERRIDE { 94 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE); 95 CHECK(success) << "Unable to allocate memory"; 96 } 97 98 DBusWatch* raw_watch_; 99 base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_; 100 }; 101 102 // The class is used for monitoring the timeout used for D-Bus method 103 // calls. 104 // 105 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of 106 // the object is is alive when HandleTimeout() is called. It's unlikely 107 // but it may be possible that HandleTimeout() is called after 108 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in 109 // Bus::OnRemoveTimeout(). 110 class Timeout : public base::RefCountedThreadSafe<Timeout> { 111 public: 112 explicit Timeout(DBusTimeout* timeout) 113 : raw_timeout_(timeout), 114 monitoring_is_active_(false), 115 is_completed(false) { 116 dbus_timeout_set_data(raw_timeout_, this, NULL); 117 AddRef(); // Balanced on Complete(). 118 } 119 120 // Returns true if the timeout is ready to be monitored. 121 bool IsReadyToBeMonitored() { 122 return dbus_timeout_get_enabled(raw_timeout_); 123 } 124 125 // Starts monitoring the timeout. 126 void StartMonitoring(Bus* bus) { 127 bus->PostDelayedTaskToDBusThread(FROM_HERE, 128 base::Bind(&Timeout::HandleTimeout, 129 this), 130 GetInterval()); 131 monitoring_is_active_ = true; 132 } 133 134 // Stops monitoring the timeout. 135 void StopMonitoring() { 136 // We cannot take back the delayed task we posted in 137 // StartMonitoring(), so we just mark the monitoring is inactive now. 138 monitoring_is_active_ = false; 139 } 140 141 // Returns the interval. 142 base::TimeDelta GetInterval() { 143 return base::TimeDelta::FromMilliseconds( 144 dbus_timeout_get_interval(raw_timeout_)); 145 } 146 147 // Cleans up the raw_timeout and marks that timeout is completed. 148 // See the class comment above for why we are doing this. 149 void Complete() { 150 dbus_timeout_set_data(raw_timeout_, NULL, NULL); 151 is_completed = true; 152 Release(); 153 } 154 155 private: 156 friend class base::RefCountedThreadSafe<Timeout>; 157 ~Timeout() { 158 } 159 160 // Handles the timeout. 161 void HandleTimeout() { 162 // If the timeout is marked completed, we should do nothing. This can 163 // occur if this function is called after Bus::OnRemoveTimeout(). 164 if (is_completed) 165 return; 166 // Skip if monitoring is canceled. 167 if (!monitoring_is_active_) 168 return; 169 170 const bool success = dbus_timeout_handle(raw_timeout_); 171 CHECK(success) << "Unable to allocate memory"; 172 } 173 174 DBusTimeout* raw_timeout_; 175 bool monitoring_is_active_; 176 bool is_completed; 177 }; 178 179 } // namespace 180 181 Bus::Options::Options() 182 : bus_type(SESSION), 183 connection_type(PRIVATE) { 184 } 185 186 Bus::Options::~Options() { 187 } 188 189 Bus::Bus(const Options& options) 190 : bus_type_(options.bus_type), 191 connection_type_(options.connection_type), 192 dbus_task_runner_(options.dbus_task_runner), 193 on_shutdown_(false /* manual_reset */, false /* initially_signaled */), 194 connection_(NULL), 195 origin_thread_id_(base::PlatformThread::CurrentId()), 196 async_operations_set_up_(false), 197 shutdown_completed_(false), 198 num_pending_watches_(0), 199 num_pending_timeouts_(0), 200 address_(options.address), 201 on_disconnected_closure_(options.disconnected_callback) { 202 // This is safe to call multiple times. 203 dbus_threads_init_default(); 204 // The origin message loop is unnecessary if the client uses synchronous 205 // functions only. 206 if (base::MessageLoop::current()) 207 origin_task_runner_ = base::MessageLoop::current()->message_loop_proxy(); 208 } 209 210 Bus::~Bus() { 211 DCHECK(!connection_); 212 DCHECK(owned_service_names_.empty()); 213 DCHECK(match_rules_added_.empty()); 214 DCHECK(filter_functions_added_.empty()); 215 DCHECK(registered_object_paths_.empty()); 216 DCHECK_EQ(0, num_pending_watches_); 217 // TODO(satorux): This check fails occasionally in browser_tests for tests 218 // that run very quickly. Perhaps something does not have time to clean up. 219 // Despite the check failing, the tests seem to run fine. crosbug.com/23416 220 // DCHECK_EQ(0, num_pending_timeouts_); 221 } 222 223 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name, 224 const ObjectPath& object_path) { 225 return GetObjectProxyWithOptions(service_name, object_path, 226 ObjectProxy::DEFAULT_OPTIONS); 227 } 228 229 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name, 230 const ObjectPath& object_path, 231 int options) { 232 AssertOnOriginThread(); 233 234 // Check if we already have the requested object proxy. 235 const ObjectProxyTable::key_type key(service_name + object_path.value(), 236 options); 237 ObjectProxyTable::iterator iter = object_proxy_table_.find(key); 238 if (iter != object_proxy_table_.end()) { 239 return iter->second.get(); 240 } 241 242 scoped_refptr<ObjectProxy> object_proxy = 243 new ObjectProxy(this, service_name, object_path, options); 244 object_proxy_table_[key] = object_proxy; 245 246 return object_proxy.get(); 247 } 248 249 bool Bus::RemoveObjectProxy(const std::string& service_name, 250 const ObjectPath& object_path, 251 const base::Closure& callback) { 252 return RemoveObjectProxyWithOptions(service_name, object_path, 253 ObjectProxy::DEFAULT_OPTIONS, 254 callback); 255 } 256 257 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name, 258 const ObjectPath& object_path, 259 int options, 260 const base::Closure& callback) { 261 AssertOnOriginThread(); 262 263 // Check if we have the requested object proxy. 264 const ObjectProxyTable::key_type key(service_name + object_path.value(), 265 options); 266 ObjectProxyTable::iterator iter = object_proxy_table_.find(key); 267 if (iter != object_proxy_table_.end()) { 268 // Object is present. Remove it now and Detach in the DBus thread. 269 PostTaskToDBusThread(FROM_HERE, base::Bind( 270 &Bus::RemoveObjectProxyInternal, 271 this, iter->second, callback)); 272 273 object_proxy_table_.erase(iter); 274 return true; 275 } 276 return false; 277 } 278 279 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy, 280 const base::Closure& callback) { 281 AssertOnDBusThread(); 282 283 object_proxy.get()->Detach(); 284 285 PostTaskToOriginThread(FROM_HERE, callback); 286 } 287 288 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) { 289 AssertOnOriginThread(); 290 291 // Check if we already have the requested exported object. 292 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 293 if (iter != exported_object_table_.end()) { 294 return iter->second.get(); 295 } 296 297 scoped_refptr<ExportedObject> exported_object = 298 new ExportedObject(this, object_path); 299 exported_object_table_[object_path] = exported_object; 300 301 return exported_object.get(); 302 } 303 304 void Bus::UnregisterExportedObject(const ObjectPath& object_path) { 305 AssertOnOriginThread(); 306 307 // Remove the registered object from the table first, to allow a new 308 // GetExportedObject() call to return a new object, rather than this one. 309 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 310 if (iter == exported_object_table_.end()) 311 return; 312 313 scoped_refptr<ExportedObject> exported_object = iter->second; 314 exported_object_table_.erase(iter); 315 316 // Post the task to perform the final unregistration to the D-Bus thread. 317 // Since the registration also happens on the D-Bus thread in 318 // TryRegisterObjectPath(), and the task runner we post to is a 319 // SequencedTaskRunner, there is a guarantee that this will happen before any 320 // future registration call. 321 PostTaskToDBusThread(FROM_HERE, 322 base::Bind(&Bus::UnregisterExportedObjectInternal, 323 this, exported_object)); 324 } 325 326 void Bus::UnregisterExportedObjectInternal( 327 scoped_refptr<ExportedObject> exported_object) { 328 AssertOnDBusThread(); 329 330 exported_object->Unregister(); 331 } 332 333 ObjectManager* Bus::GetObjectManager(const std::string& service_name, 334 const ObjectPath& object_path) { 335 AssertOnOriginThread(); 336 337 // Check if we already have the requested object manager. 338 const ObjectManagerTable::key_type key(service_name + object_path.value()); 339 ObjectManagerTable::iterator iter = object_manager_table_.find(key); 340 if (iter != object_manager_table_.end()) { 341 return iter->second.get(); 342 } 343 344 scoped_refptr<ObjectManager> object_manager = 345 new ObjectManager(this, service_name, object_path); 346 object_manager_table_[key] = object_manager; 347 348 return object_manager.get(); 349 } 350 351 void Bus::RemoveObjectManager(const std::string& service_name, 352 const ObjectPath& object_path) { 353 AssertOnOriginThread(); 354 355 const ObjectManagerTable::key_type key(service_name + object_path.value()); 356 ObjectManagerTable::iterator iter = object_manager_table_.find(key); 357 if (iter == object_manager_table_.end()) 358 return; 359 360 scoped_refptr<ObjectManager> object_manager = iter->second; 361 object_manager_table_.erase(iter); 362 } 363 364 void Bus::GetManagedObjects() { 365 for (ObjectManagerTable::iterator iter = object_manager_table_.begin(); 366 iter != object_manager_table_.end(); ++iter) { 367 iter->second->GetManagedObjects(); 368 } 369 } 370 371 bool Bus::Connect() { 372 // dbus_bus_get_private() and dbus_bus_get() are blocking calls. 373 AssertOnDBusThread(); 374 375 // Check if it's already initialized. 376 if (connection_) 377 return true; 378 379 ScopedDBusError error; 380 if (bus_type_ == CUSTOM_ADDRESS) { 381 if (connection_type_ == PRIVATE) { 382 connection_ = dbus_connection_open_private(address_.c_str(), error.get()); 383 } else { 384 connection_ = dbus_connection_open(address_.c_str(), error.get()); 385 } 386 } else { 387 const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_); 388 if (connection_type_ == PRIVATE) { 389 connection_ = dbus_bus_get_private(dbus_bus_type, error.get()); 390 } else { 391 connection_ = dbus_bus_get(dbus_bus_type, error.get()); 392 } 393 } 394 if (!connection_) { 395 LOG(ERROR) << "Failed to connect to the bus: " 396 << (error.is_set() ? error.message() : ""); 397 return false; 398 } 399 400 if (bus_type_ == CUSTOM_ADDRESS) { 401 // We should call dbus_bus_register here, otherwise unique name can not be 402 // acquired. According to dbus specification, it is responsible to call 403 // org.freedesktop.DBus.Hello method at the beging of bus connection to 404 // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is 405 // called internally. 406 if (!dbus_bus_register(connection_, error.get())) { 407 LOG(ERROR) << "Failed to register the bus component: " 408 << (error.is_set() ? error.message() : ""); 409 return false; 410 } 411 } 412 // We shouldn't exit on the disconnected signal. 413 dbus_connection_set_exit_on_disconnect(connection_, false); 414 415 // Watch Disconnected signal. 416 AddFilterFunction(Bus::OnConnectionDisconnectedFilter, this); 417 AddMatch(kDisconnectedMatchRule, error.get()); 418 419 return true; 420 } 421 422 void Bus::ClosePrivateConnection() { 423 // dbus_connection_close is blocking call. 424 AssertOnDBusThread(); 425 DCHECK_EQ(PRIVATE, connection_type_) 426 << "non-private connection should not be closed"; 427 dbus_connection_close(connection_); 428 } 429 430 void Bus::ShutdownAndBlock() { 431 AssertOnDBusThread(); 432 433 if (shutdown_completed_) 434 return; // Already shutdowned, just return. 435 436 // Unregister the exported objects. 437 for (ExportedObjectTable::iterator iter = exported_object_table_.begin(); 438 iter != exported_object_table_.end(); ++iter) { 439 iter->second->Unregister(); 440 } 441 442 // Release all service names. 443 for (std::set<std::string>::iterator iter = owned_service_names_.begin(); 444 iter != owned_service_names_.end();) { 445 // This is a bit tricky but we should increment the iter here as 446 // ReleaseOwnership() may remove |service_name| from the set. 447 const std::string& service_name = *iter++; 448 ReleaseOwnership(service_name); 449 } 450 if (!owned_service_names_.empty()) { 451 LOG(ERROR) << "Failed to release all service names. # of services left: " 452 << owned_service_names_.size(); 453 } 454 455 // Detach from the remote objects. 456 for (ObjectProxyTable::iterator iter = object_proxy_table_.begin(); 457 iter != object_proxy_table_.end(); ++iter) { 458 iter->second->Detach(); 459 } 460 461 // Release object proxies and exported objects here. We should do this 462 // here rather than in the destructor to avoid memory leaks due to 463 // cyclic references. 464 object_proxy_table_.clear(); 465 exported_object_table_.clear(); 466 467 // Private connection should be closed. 468 if (connection_) { 469 // Remove Disconnected watcher. 470 ScopedDBusError error; 471 RemoveFilterFunction(Bus::OnConnectionDisconnectedFilter, this); 472 RemoveMatch(kDisconnectedMatchRule, error.get()); 473 474 if (connection_type_ == PRIVATE) 475 ClosePrivateConnection(); 476 // dbus_connection_close() won't unref. 477 dbus_connection_unref(connection_); 478 } 479 480 connection_ = NULL; 481 shutdown_completed_ = true; 482 } 483 484 void Bus::ShutdownOnDBusThreadAndBlock() { 485 AssertOnOriginThread(); 486 DCHECK(dbus_task_runner_.get()); 487 488 PostTaskToDBusThread(FROM_HERE, base::Bind( 489 &Bus::ShutdownOnDBusThreadAndBlockInternal, 490 this)); 491 492 // http://crbug.com/125222 493 base::ThreadRestrictions::ScopedAllowWait allow_wait; 494 495 // Wait until the shutdown is complete on the D-Bus thread. 496 // The shutdown should not hang, but set timeout just in case. 497 const int kTimeoutSecs = 3; 498 const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs)); 499 const bool signaled = on_shutdown_.TimedWait(timeout); 500 LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus"; 501 } 502 503 void Bus::RequestOwnership(const std::string& service_name, 504 ServiceOwnershipOptions options, 505 OnOwnershipCallback on_ownership_callback) { 506 AssertOnOriginThread(); 507 508 PostTaskToDBusThread(FROM_HERE, base::Bind( 509 &Bus::RequestOwnershipInternal, 510 this, service_name, options, on_ownership_callback)); 511 } 512 513 void Bus::RequestOwnershipInternal(const std::string& service_name, 514 ServiceOwnershipOptions options, 515 OnOwnershipCallback on_ownership_callback) { 516 AssertOnDBusThread(); 517 518 bool success = Connect(); 519 if (success) 520 success = RequestOwnershipAndBlock(service_name, options); 521 522 PostTaskToOriginThread(FROM_HERE, 523 base::Bind(on_ownership_callback, 524 service_name, 525 success)); 526 } 527 528 bool Bus::RequestOwnershipAndBlock(const std::string& service_name, 529 ServiceOwnershipOptions options) { 530 DCHECK(connection_); 531 // dbus_bus_request_name() is a blocking call. 532 AssertOnDBusThread(); 533 534 // Check if we already own the service name. 535 if (owned_service_names_.find(service_name) != owned_service_names_.end()) { 536 return true; 537 } 538 539 ScopedDBusError error; 540 const int result = dbus_bus_request_name(connection_, 541 service_name.c_str(), 542 options, 543 error.get()); 544 if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) { 545 LOG(ERROR) << "Failed to get the ownership of " << service_name << ": " 546 << (error.is_set() ? error.message() : ""); 547 return false; 548 } 549 owned_service_names_.insert(service_name); 550 return true; 551 } 552 553 bool Bus::ReleaseOwnership(const std::string& service_name) { 554 DCHECK(connection_); 555 // dbus_bus_request_name() is a blocking call. 556 AssertOnDBusThread(); 557 558 // Check if we already own the service name. 559 std::set<std::string>::iterator found = 560 owned_service_names_.find(service_name); 561 if (found == owned_service_names_.end()) { 562 LOG(ERROR) << service_name << " is not owned by the bus"; 563 return false; 564 } 565 566 ScopedDBusError error; 567 const int result = dbus_bus_release_name(connection_, service_name.c_str(), 568 error.get()); 569 if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) { 570 owned_service_names_.erase(found); 571 return true; 572 } else { 573 LOG(ERROR) << "Failed to release the ownership of " << service_name << ": " 574 << (error.is_set() ? error.message() : "") 575 << ", result code: " << result; 576 return false; 577 } 578 } 579 580 bool Bus::SetUpAsyncOperations() { 581 DCHECK(connection_); 582 AssertOnDBusThread(); 583 584 if (async_operations_set_up_) 585 return true; 586 587 // Process all the incoming data if any, so that OnDispatchStatus() will 588 // be called when the incoming data is ready. 589 ProcessAllIncomingDataIfAny(); 590 591 bool success = dbus_connection_set_watch_functions(connection_, 592 &Bus::OnAddWatchThunk, 593 &Bus::OnRemoveWatchThunk, 594 &Bus::OnToggleWatchThunk, 595 this, 596 NULL); 597 CHECK(success) << "Unable to allocate memory"; 598 599 success = dbus_connection_set_timeout_functions(connection_, 600 &Bus::OnAddTimeoutThunk, 601 &Bus::OnRemoveTimeoutThunk, 602 &Bus::OnToggleTimeoutThunk, 603 this, 604 NULL); 605 CHECK(success) << "Unable to allocate memory"; 606 607 dbus_connection_set_dispatch_status_function( 608 connection_, 609 &Bus::OnDispatchStatusChangedThunk, 610 this, 611 NULL); 612 613 async_operations_set_up_ = true; 614 615 return true; 616 } 617 618 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request, 619 int timeout_ms, 620 DBusError* error) { 621 DCHECK(connection_); 622 AssertOnDBusThread(); 623 624 return dbus_connection_send_with_reply_and_block( 625 connection_, request, timeout_ms, error); 626 } 627 628 void Bus::SendWithReply(DBusMessage* request, 629 DBusPendingCall** pending_call, 630 int timeout_ms) { 631 DCHECK(connection_); 632 AssertOnDBusThread(); 633 634 const bool success = dbus_connection_send_with_reply( 635 connection_, request, pending_call, timeout_ms); 636 CHECK(success) << "Unable to allocate memory"; 637 } 638 639 void Bus::Send(DBusMessage* request, uint32* serial) { 640 DCHECK(connection_); 641 AssertOnDBusThread(); 642 643 const bool success = dbus_connection_send(connection_, request, serial); 644 CHECK(success) << "Unable to allocate memory"; 645 } 646 647 bool Bus::AddFilterFunction(DBusHandleMessageFunction filter_function, 648 void* user_data) { 649 DCHECK(connection_); 650 AssertOnDBusThread(); 651 652 std::pair<DBusHandleMessageFunction, void*> filter_data_pair = 653 std::make_pair(filter_function, user_data); 654 if (filter_functions_added_.find(filter_data_pair) != 655 filter_functions_added_.end()) { 656 VLOG(1) << "Filter function already exists: " << filter_function 657 << " with associated data: " << user_data; 658 return false; 659 } 660 661 const bool success = dbus_connection_add_filter( 662 connection_, filter_function, user_data, NULL); 663 CHECK(success) << "Unable to allocate memory"; 664 filter_functions_added_.insert(filter_data_pair); 665 return true; 666 } 667 668 bool Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function, 669 void* user_data) { 670 DCHECK(connection_); 671 AssertOnDBusThread(); 672 673 std::pair<DBusHandleMessageFunction, void*> filter_data_pair = 674 std::make_pair(filter_function, user_data); 675 if (filter_functions_added_.find(filter_data_pair) == 676 filter_functions_added_.end()) { 677 VLOG(1) << "Requested to remove an unknown filter function: " 678 << filter_function 679 << " with associated data: " << user_data; 680 return false; 681 } 682 683 dbus_connection_remove_filter(connection_, filter_function, user_data); 684 filter_functions_added_.erase(filter_data_pair); 685 return true; 686 } 687 688 void Bus::AddMatch(const std::string& match_rule, DBusError* error) { 689 DCHECK(connection_); 690 AssertOnDBusThread(); 691 692 std::map<std::string, int>::iterator iter = 693 match_rules_added_.find(match_rule); 694 if (iter != match_rules_added_.end()) { 695 // The already existing rule's counter is incremented. 696 iter->second++; 697 698 VLOG(1) << "Match rule already exists: " << match_rule; 699 return; 700 } 701 702 dbus_bus_add_match(connection_, match_rule.c_str(), error); 703 match_rules_added_[match_rule] = 1; 704 } 705 706 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) { 707 DCHECK(connection_); 708 AssertOnDBusThread(); 709 710 std::map<std::string, int>::iterator iter = 711 match_rules_added_.find(match_rule); 712 if (iter == match_rules_added_.end()) { 713 LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule; 714 return false; 715 } 716 717 // The rule's counter is decremented and the rule is deleted when reachs 0. 718 iter->second--; 719 if (iter->second == 0) { 720 dbus_bus_remove_match(connection_, match_rule.c_str(), error); 721 match_rules_added_.erase(match_rule); 722 } 723 return true; 724 } 725 726 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path, 727 const DBusObjectPathVTable* vtable, 728 void* user_data, 729 DBusError* error) { 730 DCHECK(connection_); 731 AssertOnDBusThread(); 732 733 if (registered_object_paths_.find(object_path) != 734 registered_object_paths_.end()) { 735 LOG(ERROR) << "Object path already registered: " << object_path.value(); 736 return false; 737 } 738 739 const bool success = dbus_connection_try_register_object_path( 740 connection_, 741 object_path.value().c_str(), 742 vtable, 743 user_data, 744 error); 745 if (success) 746 registered_object_paths_.insert(object_path); 747 return success; 748 } 749 750 void Bus::UnregisterObjectPath(const ObjectPath& object_path) { 751 DCHECK(connection_); 752 AssertOnDBusThread(); 753 754 if (registered_object_paths_.find(object_path) == 755 registered_object_paths_.end()) { 756 LOG(ERROR) << "Requested to unregister an unknown object path: " 757 << object_path.value(); 758 return; 759 } 760 761 const bool success = dbus_connection_unregister_object_path( 762 connection_, 763 object_path.value().c_str()); 764 CHECK(success) << "Unable to allocate memory"; 765 registered_object_paths_.erase(object_path); 766 } 767 768 void Bus::ShutdownOnDBusThreadAndBlockInternal() { 769 AssertOnDBusThread(); 770 771 ShutdownAndBlock(); 772 on_shutdown_.Signal(); 773 } 774 775 void Bus::ProcessAllIncomingDataIfAny() { 776 AssertOnDBusThread(); 777 778 // As mentioned at the class comment in .h file, connection_ can be NULL. 779 if (!connection_) 780 return; 781 782 // It is safe and necessary to call dbus_connection_get_dispatch_status even 783 // if the connection is lost. Otherwise we will miss "Disconnected" signal. 784 // (crbug.com/174431) 785 if (dbus_connection_get_dispatch_status(connection_) == 786 DBUS_DISPATCH_DATA_REMAINS) { 787 while (dbus_connection_dispatch(connection_) == 788 DBUS_DISPATCH_DATA_REMAINS) { 789 } 790 } 791 } 792 793 void Bus::PostTaskToDBusThreadAndReply( 794 const tracked_objects::Location& from_here, 795 const base::Closure& task, 796 const base::Closure& reply) { 797 AssertOnOriginThread(); 798 799 if (dbus_task_runner_.get()) { 800 if (!dbus_task_runner_->PostTaskAndReply(from_here, task, reply)) { 801 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop"; 802 } 803 } else { 804 DCHECK(origin_task_runner_.get()); 805 if (!origin_task_runner_->PostTaskAndReply(from_here, task, reply)) { 806 LOG(WARNING) << "Failed to post a task to the origin message loop"; 807 } 808 } 809 } 810 811 void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here, 812 const base::Closure& task) { 813 DCHECK(origin_task_runner_.get()); 814 if (!origin_task_runner_->PostTask(from_here, task)) { 815 LOG(WARNING) << "Failed to post a task to the origin message loop"; 816 } 817 } 818 819 void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here, 820 const base::Closure& task) { 821 if (dbus_task_runner_.get()) { 822 if (!dbus_task_runner_->PostTask(from_here, task)) { 823 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop"; 824 } 825 } else { 826 DCHECK(origin_task_runner_.get()); 827 if (!origin_task_runner_->PostTask(from_here, task)) { 828 LOG(WARNING) << "Failed to post a task to the origin message loop"; 829 } 830 } 831 } 832 833 void Bus::PostDelayedTaskToDBusThread( 834 const tracked_objects::Location& from_here, 835 const base::Closure& task, 836 base::TimeDelta delay) { 837 if (dbus_task_runner_.get()) { 838 if (!dbus_task_runner_->PostDelayedTask( 839 from_here, task, delay)) { 840 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop"; 841 } 842 } else { 843 DCHECK(origin_task_runner_.get()); 844 if (!origin_task_runner_->PostDelayedTask(from_here, task, delay)) { 845 LOG(WARNING) << "Failed to post a task to the origin message loop"; 846 } 847 } 848 } 849 850 bool Bus::HasDBusThread() { 851 return dbus_task_runner_.get() != NULL; 852 } 853 854 void Bus::AssertOnOriginThread() { 855 DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId()); 856 } 857 858 void Bus::AssertOnDBusThread() { 859 base::ThreadRestrictions::AssertIOAllowed(); 860 861 if (dbus_task_runner_.get()) { 862 DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread()); 863 } else { 864 AssertOnOriginThread(); 865 } 866 } 867 868 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name, 869 GetServiceOwnerOption options) { 870 AssertOnDBusThread(); 871 872 MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner"); 873 MessageWriter writer(&get_name_owner_call); 874 writer.AppendString(service_name); 875 VLOG(1) << "Method call: " << get_name_owner_call.ToString(); 876 877 const ObjectPath obj_path("/org/freedesktop/DBus"); 878 if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") || 879 !get_name_owner_call.SetPath(obj_path)) { 880 if (options == REPORT_ERRORS) 881 LOG(ERROR) << "Failed to get name owner."; 882 return ""; 883 } 884 885 ScopedDBusError error; 886 DBusMessage* response_message = 887 SendWithReplyAndBlock(get_name_owner_call.raw_message(), 888 ObjectProxy::TIMEOUT_USE_DEFAULT, 889 error.get()); 890 if (!response_message) { 891 if (options == REPORT_ERRORS) { 892 LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": " 893 << error.message(); 894 } 895 return ""; 896 } 897 898 scoped_ptr<Response> response(Response::FromRawMessage(response_message)); 899 MessageReader reader(response.get()); 900 901 std::string service_owner; 902 if (!reader.PopString(&service_owner)) 903 service_owner.clear(); 904 return service_owner; 905 } 906 907 void Bus::GetServiceOwner(const std::string& service_name, 908 const GetServiceOwnerCallback& callback) { 909 AssertOnOriginThread(); 910 911 PostTaskToDBusThread( 912 FROM_HERE, 913 base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback)); 914 } 915 916 void Bus::GetServiceOwnerInternal(const std::string& service_name, 917 const GetServiceOwnerCallback& callback) { 918 AssertOnDBusThread(); 919 920 std::string service_owner; 921 if (Connect()) 922 service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS); 923 PostTaskToOriginThread(FROM_HERE, base::Bind(callback, service_owner)); 924 } 925 926 void Bus::ListenForServiceOwnerChange( 927 const std::string& service_name, 928 const GetServiceOwnerCallback& callback) { 929 AssertOnOriginThread(); 930 DCHECK(!service_name.empty()); 931 DCHECK(!callback.is_null()); 932 933 PostTaskToDBusThread(FROM_HERE, 934 base::Bind(&Bus::ListenForServiceOwnerChangeInternal, 935 this, service_name, callback)); 936 } 937 938 void Bus::ListenForServiceOwnerChangeInternal( 939 const std::string& service_name, 940 const GetServiceOwnerCallback& callback) { 941 AssertOnDBusThread(); 942 DCHECK(!service_name.empty()); 943 DCHECK(!callback.is_null()); 944 945 if (!Connect() || !SetUpAsyncOperations()) 946 return; 947 948 if (service_owner_changed_listener_map_.empty()) { 949 bool filter_added = 950 AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this); 951 DCHECK(filter_added); 952 } 953 954 ServiceOwnerChangedListenerMap::iterator it = 955 service_owner_changed_listener_map_.find(service_name); 956 if (it == service_owner_changed_listener_map_.end()) { 957 // Add a match rule for the new service name. 958 const std::string name_owner_changed_match_rule = 959 base::StringPrintf(kServiceNameOwnerChangeMatchRule, 960 service_name.c_str()); 961 ScopedDBusError error; 962 AddMatch(name_owner_changed_match_rule, error.get()); 963 if (error.is_set()) { 964 LOG(ERROR) << "Failed to add match rule for " << service_name 965 << ". Got " << error.name() << ": " << error.message(); 966 return; 967 } 968 969 service_owner_changed_listener_map_[service_name].push_back(callback); 970 return; 971 } 972 973 // Check if the callback has already been added. 974 std::vector<GetServiceOwnerCallback>& callbacks = it->second; 975 for (size_t i = 0; i < callbacks.size(); ++i) { 976 if (callbacks[i].Equals(callback)) 977 return; 978 } 979 callbacks.push_back(callback); 980 } 981 982 void Bus::UnlistenForServiceOwnerChange( 983 const std::string& service_name, 984 const GetServiceOwnerCallback& callback) { 985 AssertOnOriginThread(); 986 DCHECK(!service_name.empty()); 987 DCHECK(!callback.is_null()); 988 989 PostTaskToDBusThread(FROM_HERE, 990 base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal, 991 this, service_name, callback)); 992 } 993 994 void Bus::UnlistenForServiceOwnerChangeInternal( 995 const std::string& service_name, 996 const GetServiceOwnerCallback& callback) { 997 AssertOnDBusThread(); 998 DCHECK(!service_name.empty()); 999 DCHECK(!callback.is_null()); 1000 1001 ServiceOwnerChangedListenerMap::iterator it = 1002 service_owner_changed_listener_map_.find(service_name); 1003 if (it == service_owner_changed_listener_map_.end()) 1004 return; 1005 1006 std::vector<GetServiceOwnerCallback>& callbacks = it->second; 1007 for (size_t i = 0; i < callbacks.size(); ++i) { 1008 if (callbacks[i].Equals(callback)) { 1009 callbacks.erase(callbacks.begin() + i); 1010 break; // There can be only one. 1011 } 1012 } 1013 if (!callbacks.empty()) 1014 return; 1015 1016 // Last callback for |service_name| has been removed, remove match rule. 1017 const std::string name_owner_changed_match_rule = 1018 base::StringPrintf(kServiceNameOwnerChangeMatchRule, 1019 service_name.c_str()); 1020 ScopedDBusError error; 1021 RemoveMatch(name_owner_changed_match_rule, error.get()); 1022 // And remove |service_owner_changed_listener_map_| entry. 1023 service_owner_changed_listener_map_.erase(it); 1024 1025 if (service_owner_changed_listener_map_.empty()) { 1026 bool filter_removed = 1027 RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this); 1028 DCHECK(filter_removed); 1029 } 1030 } 1031 1032 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) { 1033 AssertOnDBusThread(); 1034 1035 // watch will be deleted when raw_watch is removed in OnRemoveWatch(). 1036 Watch* watch = new Watch(raw_watch); 1037 if (watch->IsReadyToBeWatched()) { 1038 watch->StartWatching(); 1039 } 1040 ++num_pending_watches_; 1041 return true; 1042 } 1043 1044 void Bus::OnRemoveWatch(DBusWatch* raw_watch) { 1045 AssertOnDBusThread(); 1046 1047 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); 1048 delete watch; 1049 --num_pending_watches_; 1050 } 1051 1052 void Bus::OnToggleWatch(DBusWatch* raw_watch) { 1053 AssertOnDBusThread(); 1054 1055 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); 1056 if (watch->IsReadyToBeWatched()) { 1057 watch->StartWatching(); 1058 } else { 1059 // It's safe to call this if StartWatching() wasn't called, per 1060 // message_pump_libevent.h. 1061 watch->StopWatching(); 1062 } 1063 } 1064 1065 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) { 1066 AssertOnDBusThread(); 1067 1068 // timeout will be deleted when raw_timeout is removed in 1069 // OnRemoveTimeoutThunk(). 1070 Timeout* timeout = new Timeout(raw_timeout); 1071 if (timeout->IsReadyToBeMonitored()) { 1072 timeout->StartMonitoring(this); 1073 } 1074 ++num_pending_timeouts_; 1075 return true; 1076 } 1077 1078 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) { 1079 AssertOnDBusThread(); 1080 1081 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); 1082 timeout->Complete(); 1083 --num_pending_timeouts_; 1084 } 1085 1086 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) { 1087 AssertOnDBusThread(); 1088 1089 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); 1090 if (timeout->IsReadyToBeMonitored()) { 1091 timeout->StartMonitoring(this); 1092 } else { 1093 timeout->StopMonitoring(); 1094 } 1095 } 1096 1097 void Bus::OnDispatchStatusChanged(DBusConnection* connection, 1098 DBusDispatchStatus status) { 1099 DCHECK_EQ(connection, connection_); 1100 AssertOnDBusThread(); 1101 1102 // We cannot call ProcessAllIncomingDataIfAny() here, as calling 1103 // dbus_connection_dispatch() inside DBusDispatchStatusFunction is 1104 // prohibited by the D-Bus library. Hence, we post a task here instead. 1105 // See comments for dbus_connection_set_dispatch_status_function(). 1106 PostTaskToDBusThread(FROM_HERE, 1107 base::Bind(&Bus::ProcessAllIncomingDataIfAny, 1108 this)); 1109 } 1110 1111 void Bus::OnConnectionDisconnected(DBusConnection* connection) { 1112 AssertOnDBusThread(); 1113 1114 if (!on_disconnected_closure_.is_null()) 1115 PostTaskToOriginThread(FROM_HERE, on_disconnected_closure_); 1116 1117 if (!connection) 1118 return; 1119 DCHECK(!dbus_connection_get_is_connected(connection)); 1120 1121 ShutdownAndBlock(); 1122 } 1123 1124 void Bus::OnServiceOwnerChanged(DBusMessage* message) { 1125 DCHECK(message); 1126 AssertOnDBusThread(); 1127 1128 // |message| will be unrefed on exit of the function. Increment the 1129 // reference so we can use it in Signal::FromRawMessage() below. 1130 dbus_message_ref(message); 1131 scoped_ptr<Signal> signal(Signal::FromRawMessage(message)); 1132 1133 // Confirm the validity of the NameOwnerChanged signal. 1134 if (signal->GetMember() != kNameOwnerChangedSignal || 1135 signal->GetInterface() != DBUS_INTERFACE_DBUS || 1136 signal->GetSender() != DBUS_SERVICE_DBUS) { 1137 return; 1138 } 1139 1140 MessageReader reader(signal.get()); 1141 std::string service_name; 1142 std::string old_owner; 1143 std::string new_owner; 1144 if (!reader.PopString(&service_name) || 1145 !reader.PopString(&old_owner) || 1146 !reader.PopString(&new_owner)) { 1147 return; 1148 } 1149 1150 ServiceOwnerChangedListenerMap::const_iterator it = 1151 service_owner_changed_listener_map_.find(service_name); 1152 if (it == service_owner_changed_listener_map_.end()) 1153 return; 1154 1155 const std::vector<GetServiceOwnerCallback>& callbacks = it->second; 1156 for (size_t i = 0; i < callbacks.size(); ++i) { 1157 PostTaskToOriginThread(FROM_HERE, 1158 base::Bind(callbacks[i], new_owner)); 1159 } 1160 } 1161 1162 // static 1163 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) { 1164 Bus* self = static_cast<Bus*>(data); 1165 return self->OnAddWatch(raw_watch); 1166 } 1167 1168 // static 1169 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) { 1170 Bus* self = static_cast<Bus*>(data); 1171 self->OnRemoveWatch(raw_watch); 1172 } 1173 1174 // static 1175 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) { 1176 Bus* self = static_cast<Bus*>(data); 1177 self->OnToggleWatch(raw_watch); 1178 } 1179 1180 // static 1181 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1182 Bus* self = static_cast<Bus*>(data); 1183 return self->OnAddTimeout(raw_timeout); 1184 } 1185 1186 // static 1187 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1188 Bus* self = static_cast<Bus*>(data); 1189 self->OnRemoveTimeout(raw_timeout); 1190 } 1191 1192 // static 1193 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1194 Bus* self = static_cast<Bus*>(data); 1195 self->OnToggleTimeout(raw_timeout); 1196 } 1197 1198 // static 1199 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection, 1200 DBusDispatchStatus status, 1201 void* data) { 1202 Bus* self = static_cast<Bus*>(data); 1203 self->OnDispatchStatusChanged(connection, status); 1204 } 1205 1206 // static 1207 DBusHandlerResult Bus::OnConnectionDisconnectedFilter( 1208 DBusConnection* connection, 1209 DBusMessage* message, 1210 void* data) { 1211 if (dbus_message_is_signal(message, 1212 DBUS_INTERFACE_LOCAL, 1213 kDisconnectedSignal)) { 1214 Bus* self = static_cast<Bus*>(data); 1215 self->OnConnectionDisconnected(connection); 1216 return DBUS_HANDLER_RESULT_HANDLED; 1217 } 1218 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; 1219 } 1220 1221 // static 1222 DBusHandlerResult Bus::OnServiceOwnerChangedFilter( 1223 DBusConnection* connection, 1224 DBusMessage* message, 1225 void* data) { 1226 if (dbus_message_is_signal(message, 1227 DBUS_INTERFACE_DBUS, 1228 kNameOwnerChangedSignal)) { 1229 Bus* self = static_cast<Bus*>(data); 1230 self->OnServiceOwnerChanged(message); 1231 } 1232 // Always return unhandled to let others, e.g. ObjectProxies, handle the same 1233 // signal. 1234 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; 1235 } 1236 1237 } // namespace dbus 1238