1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "mojo/public/cpp/system/watcher.h" 6 7 #include "base/bind.h" 8 #include "base/location.h" 9 #include "base/macros.h" 10 #include "base/message_loop/message_loop.h" 11 #include "mojo/public/c/system/functions.h" 12 13 namespace mojo { 14 15 class Watcher::MessageLoopObserver 16 : public base::MessageLoop::DestructionObserver { 17 public: 18 explicit MessageLoopObserver(Watcher* watcher) : watcher_(watcher) { 19 base::MessageLoop::current()->AddDestructionObserver(this); 20 } 21 22 ~MessageLoopObserver() override { 23 StopObservingIfNecessary(); 24 } 25 26 private: 27 // base::MessageLoop::DestructionObserver: 28 void WillDestroyCurrentMessageLoop() override { 29 StopObservingIfNecessary(); 30 if (watcher_->IsWatching()) { 31 // TODO(yzshen): Remove this notification. crbug.com/604762 32 watcher_->OnHandleReady(MOJO_RESULT_ABORTED); 33 } 34 } 35 36 void StopObservingIfNecessary() { 37 if (is_observing_) { 38 is_observing_ = false; 39 base::MessageLoop::current()->RemoveDestructionObserver(this); 40 } 41 } 42 43 bool is_observing_ = true; 44 Watcher* watcher_; 45 46 DISALLOW_COPY_AND_ASSIGN(MessageLoopObserver); 47 }; 48 49 Watcher::Watcher(scoped_refptr<base::SingleThreadTaskRunner> runner) 50 : task_runner_(std::move(runner)), 51 is_default_task_runner_(task_runner_ == 52 base::ThreadTaskRunnerHandle::Get()), 53 weak_factory_(this) { 54 DCHECK(task_runner_->BelongsToCurrentThread()); 55 weak_self_ = weak_factory_.GetWeakPtr(); 56 } 57 58 Watcher::~Watcher() { 59 if(IsWatching()) 60 Cancel(); 61 } 62 63 bool Watcher::IsWatching() const { 64 DCHECK(thread_checker_.CalledOnValidThread()); 65 return handle_.is_valid(); 66 } 67 68 MojoResult Watcher::Start(Handle handle, 69 MojoHandleSignals signals, 70 const ReadyCallback& callback) { 71 DCHECK(thread_checker_.CalledOnValidThread()); 72 DCHECK(!IsWatching()); 73 DCHECK(!callback.is_null()); 74 75 message_loop_observer_.reset(new MessageLoopObserver(this)); 76 callback_ = callback; 77 handle_ = handle; 78 MojoResult result = MojoWatch(handle_.value(), signals, 79 &Watcher::CallOnHandleReady, 80 reinterpret_cast<uintptr_t>(this)); 81 if (result != MOJO_RESULT_OK) { 82 handle_.set_value(kInvalidHandleValue); 83 callback_.Reset(); 84 message_loop_observer_.reset(); 85 DCHECK(result == MOJO_RESULT_FAILED_PRECONDITION || 86 result == MOJO_RESULT_INVALID_ARGUMENT); 87 return result; 88 } 89 90 return MOJO_RESULT_OK; 91 } 92 93 void Watcher::Cancel() { 94 DCHECK(thread_checker_.CalledOnValidThread()); 95 96 // The watch may have already been cancelled if the handle was closed. 97 if (!handle_.is_valid()) 98 return; 99 100 MojoResult result = 101 MojoCancelWatch(handle_.value(), reinterpret_cast<uintptr_t>(this)); 102 message_loop_observer_.reset(); 103 // |result| may be MOJO_RESULT_INVALID_ARGUMENT if |handle_| has closed, but 104 // OnHandleReady has not yet been called. 105 DCHECK(result == MOJO_RESULT_INVALID_ARGUMENT || result == MOJO_RESULT_OK); 106 handle_.set_value(kInvalidHandleValue); 107 callback_.Reset(); 108 } 109 110 void Watcher::OnHandleReady(MojoResult result) { 111 DCHECK(thread_checker_.CalledOnValidThread()); 112 113 ReadyCallback callback = callback_; 114 if (result == MOJO_RESULT_CANCELLED) { 115 message_loop_observer_.reset(); 116 handle_.set_value(kInvalidHandleValue); 117 callback_.Reset(); 118 } 119 120 // NOTE: It's legal for |callback| to delete |this|. 121 if (!callback.is_null()) 122 callback.Run(result); 123 } 124 125 // static 126 void Watcher::CallOnHandleReady(uintptr_t context, 127 MojoResult result, 128 MojoHandleSignalsState signals_state, 129 MojoWatchNotificationFlags flags) { 130 // NOTE: It is safe to assume the Watcher still exists because this callback 131 // will never be run after the Watcher's destructor. 132 // 133 // TODO: Maybe we should also expose |signals_state| through the Watcher API. 134 // Current HandleWatcher users have no need for it, so it's omitted here. 135 Watcher* watcher = reinterpret_cast<Watcher*>(context); 136 if ((flags & MOJO_WATCH_NOTIFICATION_FLAG_FROM_SYSTEM) && 137 watcher->task_runner_->RunsTasksOnCurrentThread() && 138 watcher->is_default_task_runner_) { 139 // System notifications will trigger from the task runner passed to 140 // mojo::edk::InitIPCSupport(). In Chrome this happens to always be the 141 // default task runner for the IO thread. 142 watcher->OnHandleReady(result); 143 } else { 144 watcher->task_runner_->PostTask( 145 FROM_HERE, 146 base::Bind(&Watcher::OnHandleReady, watcher->weak_self_, result)); 147 } 148 } 149 150 } // namespace mojo 151