1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "dbus/bus.h" 6 7 #include <stddef.h> 8 9 #include "base/bind.h" 10 #include "base/logging.h" 11 #include "base/message_loop/message_loop.h" 12 #include "base/stl_util.h" 13 #include "base/strings/stringprintf.h" 14 #include "base/threading/thread.h" 15 #include "base/threading/thread_restrictions.h" 16 #include "base/threading/thread_task_runner_handle.h" 17 #include "base/time/time.h" 18 #include "dbus/exported_object.h" 19 #include "dbus/message.h" 20 #include "dbus/object_manager.h" 21 #include "dbus/object_path.h" 22 #include "dbus/object_proxy.h" 23 #include "dbus/scoped_dbus_error.h" 24 25 namespace dbus { 26 27 namespace { 28 29 const char kDisconnectedSignal[] = "Disconnected"; 30 const char kDisconnectedMatchRule[] = 31 "type='signal', path='/org/freedesktop/DBus/Local'," 32 "interface='org.freedesktop.DBus.Local', member='Disconnected'"; 33 34 // The NameOwnerChanged member in org.freedesktop.DBus 35 const char kNameOwnerChangedSignal[] = "NameOwnerChanged"; 36 37 // The match rule used to filter for changes to a given service name owner. 38 const char kServiceNameOwnerChangeMatchRule[] = 39 "type='signal',interface='org.freedesktop.DBus'," 40 "member='NameOwnerChanged',path='/org/freedesktop/DBus'," 41 "sender='org.freedesktop.DBus',arg0='%s'"; 42 43 // The class is used for watching the file descriptor used for D-Bus 44 // communication. 45 class Watch : public base::MessagePumpLibevent::Watcher { 46 public: 47 explicit Watch(DBusWatch* watch) 48 : raw_watch_(watch), file_descriptor_watcher_(FROM_HERE) { 49 dbus_watch_set_data(raw_watch_, this, NULL); 50 } 51 52 ~Watch() override { dbus_watch_set_data(raw_watch_, NULL, NULL); } 53 54 // Returns true if the underlying file descriptor is ready to be watched. 55 bool IsReadyToBeWatched() { 56 return dbus_watch_get_enabled(raw_watch_); 57 } 58 59 // Starts watching the underlying file descriptor. 60 void StartWatching() { 61 const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_); 62 const int flags = dbus_watch_get_flags(raw_watch_); 63 64 base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ; 65 if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE)) 66 mode = base::MessageLoopForIO::WATCH_READ_WRITE; 67 else if (flags & DBUS_WATCH_READABLE) 68 mode = base::MessageLoopForIO::WATCH_READ; 69 else if (flags & DBUS_WATCH_WRITABLE) 70 mode = base::MessageLoopForIO::WATCH_WRITE; 71 else 72 NOTREACHED(); 73 74 const bool persistent = true; // Watch persistently. 75 const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor( 76 file_descriptor, persistent, mode, &file_descriptor_watcher_, this); 77 CHECK(success) << "Unable to allocate memory"; 78 } 79 80 // Stops watching the underlying file descriptor. 81 void StopWatching() { 82 file_descriptor_watcher_.StopWatchingFileDescriptor(); 83 } 84 85 private: 86 // Implement MessagePumpLibevent::Watcher. 87 void OnFileCanReadWithoutBlocking(int file_descriptor) override { 88 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE); 89 CHECK(success) << "Unable to allocate memory"; 90 } 91 92 // Implement MessagePumpLibevent::Watcher. 93 void OnFileCanWriteWithoutBlocking(int file_descriptor) override { 94 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE); 95 CHECK(success) << "Unable to allocate memory"; 96 } 97 98 DBusWatch* raw_watch_; 99 base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_; 100 }; 101 102 // The class is used for monitoring the timeout used for D-Bus method 103 // calls. 104 // 105 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of 106 // the object is is alive when HandleTimeout() is called. It's unlikely 107 // but it may be possible that HandleTimeout() is called after 108 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in 109 // Bus::OnRemoveTimeout(). 110 class Timeout : public base::RefCountedThreadSafe<Timeout> { 111 public: 112 explicit Timeout(DBusTimeout* timeout) 113 : raw_timeout_(timeout), 114 monitoring_is_active_(false), 115 is_completed(false) { 116 dbus_timeout_set_data(raw_timeout_, this, NULL); 117 AddRef(); // Balanced on Complete(). 118 } 119 120 // Returns true if the timeout is ready to be monitored. 121 bool IsReadyToBeMonitored() { 122 return dbus_timeout_get_enabled(raw_timeout_); 123 } 124 125 // Starts monitoring the timeout. 126 void StartMonitoring(Bus* bus) { 127 bus->GetDBusTaskRunner()->PostDelayedTask( 128 FROM_HERE, 129 base::Bind(&Timeout::HandleTimeout, this), 130 GetInterval()); 131 monitoring_is_active_ = true; 132 } 133 134 // Stops monitoring the timeout. 135 void StopMonitoring() { 136 // We cannot take back the delayed task we posted in 137 // StartMonitoring(), so we just mark the monitoring is inactive now. 138 monitoring_is_active_ = false; 139 } 140 141 // Returns the interval. 142 base::TimeDelta GetInterval() { 143 return base::TimeDelta::FromMilliseconds( 144 dbus_timeout_get_interval(raw_timeout_)); 145 } 146 147 // Cleans up the raw_timeout and marks that timeout is completed. 148 // See the class comment above for why we are doing this. 149 void Complete() { 150 dbus_timeout_set_data(raw_timeout_, NULL, NULL); 151 is_completed = true; 152 Release(); 153 } 154 155 private: 156 friend class base::RefCountedThreadSafe<Timeout>; 157 ~Timeout() { 158 } 159 160 // Handles the timeout. 161 void HandleTimeout() { 162 // If the timeout is marked completed, we should do nothing. This can 163 // occur if this function is called after Bus::OnRemoveTimeout(). 164 if (is_completed) 165 return; 166 // Skip if monitoring is canceled. 167 if (!monitoring_is_active_) 168 return; 169 170 const bool success = dbus_timeout_handle(raw_timeout_); 171 CHECK(success) << "Unable to allocate memory"; 172 } 173 174 DBusTimeout* raw_timeout_; 175 bool monitoring_is_active_; 176 bool is_completed; 177 }; 178 179 } // namespace 180 181 Bus::Options::Options() 182 : bus_type(SESSION), 183 connection_type(PRIVATE) { 184 } 185 186 Bus::Options::~Options() { 187 } 188 189 Bus::Bus(const Options& options) 190 : bus_type_(options.bus_type), 191 connection_type_(options.connection_type), 192 dbus_task_runner_(options.dbus_task_runner), 193 on_shutdown_(base::WaitableEvent::ResetPolicy::AUTOMATIC, 194 base::WaitableEvent::InitialState::NOT_SIGNALED), 195 connection_(NULL), 196 origin_thread_id_(base::PlatformThread::CurrentId()), 197 async_operations_set_up_(false), 198 shutdown_completed_(false), 199 num_pending_watches_(0), 200 num_pending_timeouts_(0), 201 address_(options.address) { 202 // This is safe to call multiple times. 203 dbus_threads_init_default(); 204 // The origin message loop is unnecessary if the client uses synchronous 205 // functions only. 206 if (base::ThreadTaskRunnerHandle::IsSet()) 207 origin_task_runner_ = base::ThreadTaskRunnerHandle::Get(); 208 } 209 210 Bus::~Bus() { 211 DCHECK(!connection_); 212 DCHECK(owned_service_names_.empty()); 213 DCHECK(match_rules_added_.empty()); 214 DCHECK(filter_functions_added_.empty()); 215 DCHECK(registered_object_paths_.empty()); 216 DCHECK_EQ(0, num_pending_watches_); 217 // TODO(satorux): This check fails occasionally in browser_tests for tests 218 // that run very quickly. Perhaps something does not have time to clean up. 219 // Despite the check failing, the tests seem to run fine. crosbug.com/23416 220 // DCHECK_EQ(0, num_pending_timeouts_); 221 } 222 223 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name, 224 const ObjectPath& object_path) { 225 return GetObjectProxyWithOptions(service_name, object_path, 226 ObjectProxy::DEFAULT_OPTIONS); 227 } 228 229 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name, 230 const ObjectPath& object_path, 231 int options) { 232 AssertOnOriginThread(); 233 234 // Check if we already have the requested object proxy. 235 const ObjectProxyTable::key_type key(service_name + object_path.value(), 236 options); 237 ObjectProxyTable::iterator iter = object_proxy_table_.find(key); 238 if (iter != object_proxy_table_.end()) { 239 return iter->second.get(); 240 } 241 242 scoped_refptr<ObjectProxy> object_proxy = 243 new ObjectProxy(this, service_name, object_path, options); 244 object_proxy_table_[key] = object_proxy; 245 246 return object_proxy.get(); 247 } 248 249 bool Bus::RemoveObjectProxy(const std::string& service_name, 250 const ObjectPath& object_path, 251 const base::Closure& callback) { 252 return RemoveObjectProxyWithOptions(service_name, object_path, 253 ObjectProxy::DEFAULT_OPTIONS, 254 callback); 255 } 256 257 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name, 258 const ObjectPath& object_path, 259 int options, 260 const base::Closure& callback) { 261 AssertOnOriginThread(); 262 263 // Check if we have the requested object proxy. 264 const ObjectProxyTable::key_type key(service_name + object_path.value(), 265 options); 266 ObjectProxyTable::iterator iter = object_proxy_table_.find(key); 267 if (iter != object_proxy_table_.end()) { 268 scoped_refptr<ObjectProxy> object_proxy = iter->second; 269 object_proxy_table_.erase(iter); 270 // Object is present. Remove it now and Detach on the DBus thread. 271 GetDBusTaskRunner()->PostTask( 272 FROM_HERE, 273 base::Bind(&Bus::RemoveObjectProxyInternal, 274 this, object_proxy, callback)); 275 return true; 276 } 277 return false; 278 } 279 280 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy, 281 const base::Closure& callback) { 282 AssertOnDBusThread(); 283 284 object_proxy.get()->Detach(); 285 286 GetOriginTaskRunner()->PostTask(FROM_HERE, callback); 287 } 288 289 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) { 290 AssertOnOriginThread(); 291 292 // Check if we already have the requested exported object. 293 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 294 if (iter != exported_object_table_.end()) { 295 return iter->second.get(); 296 } 297 298 scoped_refptr<ExportedObject> exported_object = 299 new ExportedObject(this, object_path); 300 exported_object_table_[object_path] = exported_object; 301 302 return exported_object.get(); 303 } 304 305 void Bus::UnregisterExportedObject(const ObjectPath& object_path) { 306 AssertOnOriginThread(); 307 308 // Remove the registered object from the table first, to allow a new 309 // GetExportedObject() call to return a new object, rather than this one. 310 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 311 if (iter == exported_object_table_.end()) 312 return; 313 314 scoped_refptr<ExportedObject> exported_object = iter->second; 315 exported_object_table_.erase(iter); 316 317 // Post the task to perform the final unregistration to the D-Bus thread. 318 // Since the registration also happens on the D-Bus thread in 319 // TryRegisterObjectPath(), and the task runner we post to is a 320 // SequencedTaskRunner, there is a guarantee that this will happen before any 321 // future registration call. 322 GetDBusTaskRunner()->PostTask( 323 FROM_HERE, 324 base::Bind(&Bus::UnregisterExportedObjectInternal, 325 this, exported_object)); 326 } 327 328 void Bus::UnregisterExportedObjectInternal( 329 scoped_refptr<ExportedObject> exported_object) { 330 AssertOnDBusThread(); 331 332 exported_object->Unregister(); 333 } 334 335 ObjectManager* Bus::GetObjectManager(const std::string& service_name, 336 const ObjectPath& object_path) { 337 AssertOnOriginThread(); 338 339 // Check if we already have the requested object manager. 340 const ObjectManagerTable::key_type key(service_name + object_path.value()); 341 ObjectManagerTable::iterator iter = object_manager_table_.find(key); 342 if (iter != object_manager_table_.end()) { 343 return iter->second.get(); 344 } 345 346 scoped_refptr<ObjectManager> object_manager = 347 new ObjectManager(this, service_name, object_path); 348 object_manager_table_[key] = object_manager; 349 350 return object_manager.get(); 351 } 352 353 bool Bus::RemoveObjectManager(const std::string& service_name, 354 const ObjectPath& object_path, 355 const base::Closure& callback) { 356 AssertOnOriginThread(); 357 DCHECK(!callback.is_null()); 358 359 const ObjectManagerTable::key_type key(service_name + object_path.value()); 360 ObjectManagerTable::iterator iter = object_manager_table_.find(key); 361 if (iter == object_manager_table_.end()) 362 return false; 363 364 // ObjectManager is present. Remove it now and CleanUp on the DBus thread. 365 scoped_refptr<ObjectManager> object_manager = iter->second; 366 object_manager_table_.erase(iter); 367 368 GetDBusTaskRunner()->PostTask( 369 FROM_HERE, 370 base::Bind(&Bus::RemoveObjectManagerInternal, 371 this, object_manager, callback)); 372 373 return true; 374 } 375 376 void Bus::RemoveObjectManagerInternal( 377 scoped_refptr<dbus::ObjectManager> object_manager, 378 const base::Closure& callback) { 379 AssertOnDBusThread(); 380 DCHECK(object_manager.get()); 381 382 object_manager->CleanUp(); 383 384 // The ObjectManager has to be deleted on the origin thread since it was 385 // created there. 386 GetOriginTaskRunner()->PostTask( 387 FROM_HERE, 388 base::Bind(&Bus::RemoveObjectManagerInternalHelper, 389 this, object_manager, callback)); 390 } 391 392 void Bus::RemoveObjectManagerInternalHelper( 393 scoped_refptr<dbus::ObjectManager> object_manager, 394 const base::Closure& callback) { 395 AssertOnOriginThread(); 396 DCHECK(object_manager.get()); 397 398 // Release the object manager and run the callback. 399 object_manager = NULL; 400 callback.Run(); 401 } 402 403 bool Bus::Connect() { 404 // dbus_bus_get_private() and dbus_bus_get() are blocking calls. 405 AssertOnDBusThread(); 406 407 // Check if it's already initialized. 408 if (connection_) 409 return true; 410 411 ScopedDBusError error; 412 if (bus_type_ == CUSTOM_ADDRESS) { 413 if (connection_type_ == PRIVATE) { 414 connection_ = dbus_connection_open_private(address_.c_str(), error.get()); 415 } else { 416 connection_ = dbus_connection_open(address_.c_str(), error.get()); 417 } 418 } else { 419 const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_); 420 if (connection_type_ == PRIVATE) { 421 connection_ = dbus_bus_get_private(dbus_bus_type, error.get()); 422 } else { 423 connection_ = dbus_bus_get(dbus_bus_type, error.get()); 424 } 425 } 426 if (!connection_) { 427 LOG(ERROR) << "Failed to connect to the bus: " 428 << (error.is_set() ? error.message() : ""); 429 return false; 430 } 431 432 if (bus_type_ == CUSTOM_ADDRESS) { 433 // We should call dbus_bus_register here, otherwise unique name can not be 434 // acquired. According to dbus specification, it is responsible to call 435 // org.freedesktop.DBus.Hello method at the beging of bus connection to 436 // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is 437 // called internally. 438 if (!dbus_bus_register(connection_, error.get())) { 439 LOG(ERROR) << "Failed to register the bus component: " 440 << (error.is_set() ? error.message() : ""); 441 return false; 442 } 443 } 444 // We shouldn't exit on the disconnected signal. 445 dbus_connection_set_exit_on_disconnect(connection_, false); 446 447 // Watch Disconnected signal. 448 AddFilterFunction(Bus::OnConnectionDisconnectedFilter, this); 449 AddMatch(kDisconnectedMatchRule, error.get()); 450 451 return true; 452 } 453 454 void Bus::ClosePrivateConnection() { 455 // dbus_connection_close is blocking call. 456 AssertOnDBusThread(); 457 DCHECK_EQ(PRIVATE, connection_type_) 458 << "non-private connection should not be closed"; 459 dbus_connection_close(connection_); 460 } 461 462 void Bus::ShutdownAndBlock() { 463 AssertOnDBusThread(); 464 465 if (shutdown_completed_) 466 return; // Already shutdowned, just return. 467 468 // Unregister the exported objects. 469 for (ExportedObjectTable::iterator iter = exported_object_table_.begin(); 470 iter != exported_object_table_.end(); ++iter) { 471 iter->second->Unregister(); 472 } 473 474 // Release all service names. 475 for (std::set<std::string>::iterator iter = owned_service_names_.begin(); 476 iter != owned_service_names_.end();) { 477 // This is a bit tricky but we should increment the iter here as 478 // ReleaseOwnership() may remove |service_name| from the set. 479 const std::string& service_name = *iter++; 480 ReleaseOwnership(service_name); 481 } 482 if (!owned_service_names_.empty()) { 483 LOG(ERROR) << "Failed to release all service names. # of services left: " 484 << owned_service_names_.size(); 485 } 486 487 // Detach from the remote objects. 488 for (ObjectProxyTable::iterator iter = object_proxy_table_.begin(); 489 iter != object_proxy_table_.end(); ++iter) { 490 iter->second->Detach(); 491 } 492 493 // Clean up the object managers. 494 for (ObjectManagerTable::iterator iter = object_manager_table_.begin(); 495 iter != object_manager_table_.end(); ++iter) { 496 iter->second->CleanUp(); 497 } 498 499 // Release object proxies and exported objects here. We should do this 500 // here rather than in the destructor to avoid memory leaks due to 501 // cyclic references. 502 object_proxy_table_.clear(); 503 exported_object_table_.clear(); 504 505 // Private connection should be closed. 506 if (connection_) { 507 // Remove Disconnected watcher. 508 ScopedDBusError error; 509 RemoveFilterFunction(Bus::OnConnectionDisconnectedFilter, this); 510 RemoveMatch(kDisconnectedMatchRule, error.get()); 511 512 if (connection_type_ == PRIVATE) 513 ClosePrivateConnection(); 514 // dbus_connection_close() won't unref. 515 dbus_connection_unref(connection_); 516 } 517 518 connection_ = NULL; 519 shutdown_completed_ = true; 520 } 521 522 void Bus::ShutdownOnDBusThreadAndBlock() { 523 AssertOnOriginThread(); 524 DCHECK(dbus_task_runner_.get()); 525 526 GetDBusTaskRunner()->PostTask( 527 FROM_HERE, 528 base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this)); 529 530 // http://crbug.com/125222 531 base::ThreadRestrictions::ScopedAllowWait allow_wait; 532 533 // Wait until the shutdown is complete on the D-Bus thread. 534 // The shutdown should not hang, but set timeout just in case. 535 const int kTimeoutSecs = 3; 536 const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs)); 537 const bool signaled = on_shutdown_.TimedWait(timeout); 538 LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus"; 539 } 540 541 void Bus::RequestOwnership(const std::string& service_name, 542 ServiceOwnershipOptions options, 543 OnOwnershipCallback on_ownership_callback) { 544 AssertOnOriginThread(); 545 546 GetDBusTaskRunner()->PostTask( 547 FROM_HERE, 548 base::Bind(&Bus::RequestOwnershipInternal, 549 this, service_name, options, on_ownership_callback)); 550 } 551 552 void Bus::RequestOwnershipInternal(const std::string& service_name, 553 ServiceOwnershipOptions options, 554 OnOwnershipCallback on_ownership_callback) { 555 AssertOnDBusThread(); 556 557 bool success = Connect(); 558 if (success) 559 success = RequestOwnershipAndBlock(service_name, options); 560 561 GetOriginTaskRunner()->PostTask(FROM_HERE, 562 base::Bind(on_ownership_callback, 563 service_name, 564 success)); 565 } 566 567 bool Bus::RequestOwnershipAndBlock(const std::string& service_name, 568 ServiceOwnershipOptions options) { 569 DCHECK(connection_); 570 // dbus_bus_request_name() is a blocking call. 571 AssertOnDBusThread(); 572 573 // Check if we already own the service name. 574 if (owned_service_names_.find(service_name) != owned_service_names_.end()) { 575 return true; 576 } 577 578 ScopedDBusError error; 579 const int result = dbus_bus_request_name(connection_, 580 service_name.c_str(), 581 options, 582 error.get()); 583 if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) { 584 LOG(ERROR) << "Failed to get the ownership of " << service_name << ": " 585 << (error.is_set() ? error.message() : ""); 586 return false; 587 } 588 owned_service_names_.insert(service_name); 589 return true; 590 } 591 592 bool Bus::ReleaseOwnership(const std::string& service_name) { 593 DCHECK(connection_); 594 // dbus_bus_request_name() is a blocking call. 595 AssertOnDBusThread(); 596 597 // Check if we already own the service name. 598 std::set<std::string>::iterator found = 599 owned_service_names_.find(service_name); 600 if (found == owned_service_names_.end()) { 601 LOG(ERROR) << service_name << " is not owned by the bus"; 602 return false; 603 } 604 605 ScopedDBusError error; 606 const int result = dbus_bus_release_name(connection_, service_name.c_str(), 607 error.get()); 608 if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) { 609 owned_service_names_.erase(found); 610 return true; 611 } else { 612 LOG(ERROR) << "Failed to release the ownership of " << service_name << ": " 613 << (error.is_set() ? error.message() : "") 614 << ", result code: " << result; 615 return false; 616 } 617 } 618 619 bool Bus::SetUpAsyncOperations() { 620 DCHECK(connection_); 621 AssertOnDBusThread(); 622 623 if (async_operations_set_up_) 624 return true; 625 626 // Process all the incoming data if any, so that OnDispatchStatus() will 627 // be called when the incoming data is ready. 628 ProcessAllIncomingDataIfAny(); 629 630 bool success = dbus_connection_set_watch_functions(connection_, 631 &Bus::OnAddWatchThunk, 632 &Bus::OnRemoveWatchThunk, 633 &Bus::OnToggleWatchThunk, 634 this, 635 NULL); 636 CHECK(success) << "Unable to allocate memory"; 637 638 success = dbus_connection_set_timeout_functions(connection_, 639 &Bus::OnAddTimeoutThunk, 640 &Bus::OnRemoveTimeoutThunk, 641 &Bus::OnToggleTimeoutThunk, 642 this, 643 NULL); 644 CHECK(success) << "Unable to allocate memory"; 645 646 dbus_connection_set_dispatch_status_function( 647 connection_, 648 &Bus::OnDispatchStatusChangedThunk, 649 this, 650 NULL); 651 652 async_operations_set_up_ = true; 653 654 return true; 655 } 656 657 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request, 658 int timeout_ms, 659 DBusError* error) { 660 DCHECK(connection_); 661 AssertOnDBusThread(); 662 663 return dbus_connection_send_with_reply_and_block( 664 connection_, request, timeout_ms, error); 665 } 666 667 void Bus::SendWithReply(DBusMessage* request, 668 DBusPendingCall** pending_call, 669 int timeout_ms) { 670 DCHECK(connection_); 671 AssertOnDBusThread(); 672 673 const bool success = dbus_connection_send_with_reply( 674 connection_, request, pending_call, timeout_ms); 675 CHECK(success) << "Unable to allocate memory"; 676 } 677 678 void Bus::Send(DBusMessage* request, uint32_t* serial) { 679 DCHECK(connection_); 680 AssertOnDBusThread(); 681 682 const bool success = dbus_connection_send(connection_, request, serial); 683 CHECK(success) << "Unable to allocate memory"; 684 } 685 686 void Bus::AddFilterFunction(DBusHandleMessageFunction filter_function, 687 void* user_data) { 688 DCHECK(connection_); 689 AssertOnDBusThread(); 690 691 std::pair<DBusHandleMessageFunction, void*> filter_data_pair = 692 std::make_pair(filter_function, user_data); 693 if (filter_functions_added_.find(filter_data_pair) != 694 filter_functions_added_.end()) { 695 VLOG(1) << "Filter function already exists: " << filter_function 696 << " with associated data: " << user_data; 697 return; 698 } 699 700 const bool success = dbus_connection_add_filter( 701 connection_, filter_function, user_data, NULL); 702 CHECK(success) << "Unable to allocate memory"; 703 filter_functions_added_.insert(filter_data_pair); 704 } 705 706 void Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function, 707 void* user_data) { 708 DCHECK(connection_); 709 AssertOnDBusThread(); 710 711 std::pair<DBusHandleMessageFunction, void*> filter_data_pair = 712 std::make_pair(filter_function, user_data); 713 if (filter_functions_added_.find(filter_data_pair) == 714 filter_functions_added_.end()) { 715 VLOG(1) << "Requested to remove an unknown filter function: " 716 << filter_function 717 << " with associated data: " << user_data; 718 return; 719 } 720 721 dbus_connection_remove_filter(connection_, filter_function, user_data); 722 filter_functions_added_.erase(filter_data_pair); 723 } 724 725 void Bus::AddMatch(const std::string& match_rule, DBusError* error) { 726 DCHECK(connection_); 727 AssertOnDBusThread(); 728 729 std::map<std::string, int>::iterator iter = 730 match_rules_added_.find(match_rule); 731 if (iter != match_rules_added_.end()) { 732 // The already existing rule's counter is incremented. 733 iter->second++; 734 735 VLOG(1) << "Match rule already exists: " << match_rule; 736 return; 737 } 738 739 dbus_bus_add_match(connection_, match_rule.c_str(), error); 740 match_rules_added_[match_rule] = 1; 741 } 742 743 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) { 744 DCHECK(connection_); 745 AssertOnDBusThread(); 746 747 std::map<std::string, int>::iterator iter = 748 match_rules_added_.find(match_rule); 749 if (iter == match_rules_added_.end()) { 750 LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule; 751 return false; 752 } 753 754 // The rule's counter is decremented and the rule is deleted when reachs 0. 755 iter->second--; 756 if (iter->second == 0) { 757 dbus_bus_remove_match(connection_, match_rule.c_str(), error); 758 match_rules_added_.erase(match_rule); 759 } 760 return true; 761 } 762 763 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path, 764 const DBusObjectPathVTable* vtable, 765 void* user_data, 766 DBusError* error) { 767 DCHECK(connection_); 768 AssertOnDBusThread(); 769 770 if (registered_object_paths_.find(object_path) != 771 registered_object_paths_.end()) { 772 LOG(ERROR) << "Object path already registered: " << object_path.value(); 773 return false; 774 } 775 776 const bool success = dbus_connection_try_register_object_path( 777 connection_, 778 object_path.value().c_str(), 779 vtable, 780 user_data, 781 error); 782 if (success) 783 registered_object_paths_.insert(object_path); 784 return success; 785 } 786 787 void Bus::UnregisterObjectPath(const ObjectPath& object_path) { 788 DCHECK(connection_); 789 AssertOnDBusThread(); 790 791 if (registered_object_paths_.find(object_path) == 792 registered_object_paths_.end()) { 793 LOG(ERROR) << "Requested to unregister an unknown object path: " 794 << object_path.value(); 795 return; 796 } 797 798 const bool success = dbus_connection_unregister_object_path( 799 connection_, 800 object_path.value().c_str()); 801 CHECK(success) << "Unable to allocate memory"; 802 registered_object_paths_.erase(object_path); 803 } 804 805 void Bus::ShutdownOnDBusThreadAndBlockInternal() { 806 AssertOnDBusThread(); 807 808 ShutdownAndBlock(); 809 on_shutdown_.Signal(); 810 } 811 812 void Bus::ProcessAllIncomingDataIfAny() { 813 AssertOnDBusThread(); 814 815 // As mentioned at the class comment in .h file, connection_ can be NULL. 816 if (!connection_) 817 return; 818 819 // It is safe and necessary to call dbus_connection_get_dispatch_status even 820 // if the connection is lost. 821 if (dbus_connection_get_dispatch_status(connection_) == 822 DBUS_DISPATCH_DATA_REMAINS) { 823 while (dbus_connection_dispatch(connection_) == 824 DBUS_DISPATCH_DATA_REMAINS) { 825 } 826 } 827 } 828 829 base::TaskRunner* Bus::GetDBusTaskRunner() { 830 if (dbus_task_runner_.get()) 831 return dbus_task_runner_.get(); 832 else 833 return GetOriginTaskRunner(); 834 } 835 836 base::TaskRunner* Bus::GetOriginTaskRunner() { 837 DCHECK(origin_task_runner_.get()); 838 return origin_task_runner_.get(); 839 } 840 841 bool Bus::HasDBusThread() { 842 return dbus_task_runner_.get() != NULL; 843 } 844 845 void Bus::AssertOnOriginThread() { 846 DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId()); 847 } 848 849 void Bus::AssertOnDBusThread() { 850 base::ThreadRestrictions::AssertIOAllowed(); 851 852 if (dbus_task_runner_.get()) { 853 DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread()); 854 } else { 855 AssertOnOriginThread(); 856 } 857 } 858 859 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name, 860 GetServiceOwnerOption options) { 861 AssertOnDBusThread(); 862 863 MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner"); 864 MessageWriter writer(&get_name_owner_call); 865 writer.AppendString(service_name); 866 VLOG(1) << "Method call: " << get_name_owner_call.ToString(); 867 868 const ObjectPath obj_path("/org/freedesktop/DBus"); 869 if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") || 870 !get_name_owner_call.SetPath(obj_path)) { 871 if (options == REPORT_ERRORS) 872 LOG(ERROR) << "Failed to get name owner."; 873 return ""; 874 } 875 876 ScopedDBusError error; 877 DBusMessage* response_message = 878 SendWithReplyAndBlock(get_name_owner_call.raw_message(), 879 ObjectProxy::TIMEOUT_USE_DEFAULT, 880 error.get()); 881 if (!response_message) { 882 if (options == REPORT_ERRORS) { 883 LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": " 884 << error.message(); 885 } 886 return ""; 887 } 888 889 std::unique_ptr<Response> response( 890 Response::FromRawMessage(response_message)); 891 MessageReader reader(response.get()); 892 893 std::string service_owner; 894 if (!reader.PopString(&service_owner)) 895 service_owner.clear(); 896 return service_owner; 897 } 898 899 void Bus::GetServiceOwner(const std::string& service_name, 900 const GetServiceOwnerCallback& callback) { 901 AssertOnOriginThread(); 902 903 GetDBusTaskRunner()->PostTask( 904 FROM_HERE, 905 base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback)); 906 } 907 908 void Bus::GetServiceOwnerInternal(const std::string& service_name, 909 const GetServiceOwnerCallback& callback) { 910 AssertOnDBusThread(); 911 912 std::string service_owner; 913 if (Connect()) 914 service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS); 915 GetOriginTaskRunner()->PostTask(FROM_HERE, 916 base::Bind(callback, service_owner)); 917 } 918 919 void Bus::ListenForServiceOwnerChange( 920 const std::string& service_name, 921 const GetServiceOwnerCallback& callback) { 922 AssertOnOriginThread(); 923 DCHECK(!service_name.empty()); 924 DCHECK(!callback.is_null()); 925 926 GetDBusTaskRunner()->PostTask( 927 FROM_HERE, 928 base::Bind(&Bus::ListenForServiceOwnerChangeInternal, 929 this, service_name, callback)); 930 } 931 932 void Bus::ListenForServiceOwnerChangeInternal( 933 const std::string& service_name, 934 const GetServiceOwnerCallback& callback) { 935 AssertOnDBusThread(); 936 DCHECK(!service_name.empty()); 937 DCHECK(!callback.is_null()); 938 939 if (!Connect() || !SetUpAsyncOperations()) 940 return; 941 942 if (service_owner_changed_listener_map_.empty()) 943 AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this); 944 945 ServiceOwnerChangedListenerMap::iterator it = 946 service_owner_changed_listener_map_.find(service_name); 947 if (it == service_owner_changed_listener_map_.end()) { 948 // Add a match rule for the new service name. 949 const std::string name_owner_changed_match_rule = 950 base::StringPrintf(kServiceNameOwnerChangeMatchRule, 951 service_name.c_str()); 952 ScopedDBusError error; 953 AddMatch(name_owner_changed_match_rule, error.get()); 954 if (error.is_set()) { 955 LOG(ERROR) << "Failed to add match rule for " << service_name 956 << ". Got " << error.name() << ": " << error.message(); 957 return; 958 } 959 960 service_owner_changed_listener_map_[service_name].push_back(callback); 961 return; 962 } 963 964 // Check if the callback has already been added. 965 std::vector<GetServiceOwnerCallback>& callbacks = it->second; 966 for (size_t i = 0; i < callbacks.size(); ++i) { 967 if (callbacks[i].Equals(callback)) 968 return; 969 } 970 callbacks.push_back(callback); 971 } 972 973 void Bus::UnlistenForServiceOwnerChange( 974 const std::string& service_name, 975 const GetServiceOwnerCallback& callback) { 976 AssertOnOriginThread(); 977 DCHECK(!service_name.empty()); 978 DCHECK(!callback.is_null()); 979 980 GetDBusTaskRunner()->PostTask( 981 FROM_HERE, 982 base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal, 983 this, service_name, callback)); 984 } 985 986 void Bus::UnlistenForServiceOwnerChangeInternal( 987 const std::string& service_name, 988 const GetServiceOwnerCallback& callback) { 989 AssertOnDBusThread(); 990 DCHECK(!service_name.empty()); 991 DCHECK(!callback.is_null()); 992 993 ServiceOwnerChangedListenerMap::iterator it = 994 service_owner_changed_listener_map_.find(service_name); 995 if (it == service_owner_changed_listener_map_.end()) 996 return; 997 998 std::vector<GetServiceOwnerCallback>& callbacks = it->second; 999 for (size_t i = 0; i < callbacks.size(); ++i) { 1000 if (callbacks[i].Equals(callback)) { 1001 callbacks.erase(callbacks.begin() + i); 1002 break; // There can be only one. 1003 } 1004 } 1005 if (!callbacks.empty()) 1006 return; 1007 1008 // Last callback for |service_name| has been removed, remove match rule. 1009 const std::string name_owner_changed_match_rule = 1010 base::StringPrintf(kServiceNameOwnerChangeMatchRule, 1011 service_name.c_str()); 1012 ScopedDBusError error; 1013 RemoveMatch(name_owner_changed_match_rule, error.get()); 1014 // And remove |service_owner_changed_listener_map_| entry. 1015 service_owner_changed_listener_map_.erase(it); 1016 1017 if (service_owner_changed_listener_map_.empty()) 1018 RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this); 1019 } 1020 1021 std::string Bus::GetConnectionName() { 1022 if (!connection_) 1023 return ""; 1024 return dbus_bus_get_unique_name(connection_); 1025 } 1026 1027 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) { 1028 AssertOnDBusThread(); 1029 1030 // watch will be deleted when raw_watch is removed in OnRemoveWatch(). 1031 Watch* watch = new Watch(raw_watch); 1032 if (watch->IsReadyToBeWatched()) { 1033 watch->StartWatching(); 1034 } 1035 ++num_pending_watches_; 1036 return true; 1037 } 1038 1039 void Bus::OnRemoveWatch(DBusWatch* raw_watch) { 1040 AssertOnDBusThread(); 1041 1042 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); 1043 delete watch; 1044 --num_pending_watches_; 1045 } 1046 1047 void Bus::OnToggleWatch(DBusWatch* raw_watch) { 1048 AssertOnDBusThread(); 1049 1050 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); 1051 if (watch->IsReadyToBeWatched()) { 1052 watch->StartWatching(); 1053 } else { 1054 // It's safe to call this if StartWatching() wasn't called, per 1055 // message_pump_libevent.h. 1056 watch->StopWatching(); 1057 } 1058 } 1059 1060 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) { 1061 AssertOnDBusThread(); 1062 1063 // timeout will be deleted when raw_timeout is removed in 1064 // OnRemoveTimeoutThunk(). 1065 Timeout* timeout = new Timeout(raw_timeout); 1066 if (timeout->IsReadyToBeMonitored()) { 1067 timeout->StartMonitoring(this); 1068 } 1069 ++num_pending_timeouts_; 1070 return true; 1071 } 1072 1073 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) { 1074 AssertOnDBusThread(); 1075 1076 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); 1077 timeout->Complete(); 1078 --num_pending_timeouts_; 1079 } 1080 1081 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) { 1082 AssertOnDBusThread(); 1083 1084 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); 1085 if (timeout->IsReadyToBeMonitored()) { 1086 timeout->StartMonitoring(this); 1087 } else { 1088 timeout->StopMonitoring(); 1089 } 1090 } 1091 1092 void Bus::OnDispatchStatusChanged(DBusConnection* connection, 1093 DBusDispatchStatus status) { 1094 DCHECK_EQ(connection, connection_); 1095 AssertOnDBusThread(); 1096 1097 // We cannot call ProcessAllIncomingDataIfAny() here, as calling 1098 // dbus_connection_dispatch() inside DBusDispatchStatusFunction is 1099 // prohibited by the D-Bus library. Hence, we post a task here instead. 1100 // See comments for dbus_connection_set_dispatch_status_function(). 1101 GetDBusTaskRunner()->PostTask(FROM_HERE, 1102 base::Bind(&Bus::ProcessAllIncomingDataIfAny, 1103 this)); 1104 } 1105 1106 void Bus::OnServiceOwnerChanged(DBusMessage* message) { 1107 DCHECK(message); 1108 AssertOnDBusThread(); 1109 1110 // |message| will be unrefed on exit of the function. Increment the 1111 // reference so we can use it in Signal::FromRawMessage() below. 1112 dbus_message_ref(message); 1113 std::unique_ptr<Signal> signal(Signal::FromRawMessage(message)); 1114 1115 // Confirm the validity of the NameOwnerChanged signal. 1116 if (signal->GetMember() != kNameOwnerChangedSignal || 1117 signal->GetInterface() != DBUS_INTERFACE_DBUS || 1118 signal->GetSender() != DBUS_SERVICE_DBUS) { 1119 return; 1120 } 1121 1122 MessageReader reader(signal.get()); 1123 std::string service_name; 1124 std::string old_owner; 1125 std::string new_owner; 1126 if (!reader.PopString(&service_name) || 1127 !reader.PopString(&old_owner) || 1128 !reader.PopString(&new_owner)) { 1129 return; 1130 } 1131 1132 ServiceOwnerChangedListenerMap::const_iterator it = 1133 service_owner_changed_listener_map_.find(service_name); 1134 if (it == service_owner_changed_listener_map_.end()) 1135 return; 1136 1137 const std::vector<GetServiceOwnerCallback>& callbacks = it->second; 1138 for (size_t i = 0; i < callbacks.size(); ++i) { 1139 GetOriginTaskRunner()->PostTask(FROM_HERE, 1140 base::Bind(callbacks[i], new_owner)); 1141 } 1142 } 1143 1144 // static 1145 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) { 1146 Bus* self = static_cast<Bus*>(data); 1147 return self->OnAddWatch(raw_watch); 1148 } 1149 1150 // static 1151 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) { 1152 Bus* self = static_cast<Bus*>(data); 1153 self->OnRemoveWatch(raw_watch); 1154 } 1155 1156 // static 1157 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) { 1158 Bus* self = static_cast<Bus*>(data); 1159 self->OnToggleWatch(raw_watch); 1160 } 1161 1162 // static 1163 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1164 Bus* self = static_cast<Bus*>(data); 1165 return self->OnAddTimeout(raw_timeout); 1166 } 1167 1168 // static 1169 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1170 Bus* self = static_cast<Bus*>(data); 1171 self->OnRemoveTimeout(raw_timeout); 1172 } 1173 1174 // static 1175 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1176 Bus* self = static_cast<Bus*>(data); 1177 self->OnToggleTimeout(raw_timeout); 1178 } 1179 1180 // static 1181 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection, 1182 DBusDispatchStatus status, 1183 void* data) { 1184 Bus* self = static_cast<Bus*>(data); 1185 self->OnDispatchStatusChanged(connection, status); 1186 } 1187 1188 // static 1189 DBusHandlerResult Bus::OnConnectionDisconnectedFilter( 1190 DBusConnection* connection, 1191 DBusMessage* message, 1192 void* data) { 1193 if (dbus_message_is_signal(message, 1194 DBUS_INTERFACE_LOCAL, 1195 kDisconnectedSignal)) { 1196 // Abort when the connection is lost. 1197 LOG(FATAL) << "D-Bus connection was disconnected. Aborting."; 1198 } 1199 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; 1200 } 1201 1202 // static 1203 DBusHandlerResult Bus::OnServiceOwnerChangedFilter( 1204 DBusConnection* connection, 1205 DBusMessage* message, 1206 void* data) { 1207 if (dbus_message_is_signal(message, 1208 DBUS_INTERFACE_DBUS, 1209 kNameOwnerChangedSignal)) { 1210 Bus* self = static_cast<Bus*>(data); 1211 self->OnServiceOwnerChanged(message); 1212 } 1213 // Always return unhandled to let others, e.g. ObjectProxies, handle the same 1214 // signal. 1215 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; 1216 } 1217 1218 } // namespace dbus 1219