Home | History | Annotate | Download | only in system
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "mojo/public/cpp/system/watcher.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/location.h"
      9 #include "base/macros.h"
     10 #include "base/message_loop/message_loop.h"
     11 #include "mojo/public/c/system/functions.h"
     12 
     13 namespace mojo {
     14 
     15 class Watcher::MessageLoopObserver
     16     : public base::MessageLoop::DestructionObserver {
     17  public:
     18   explicit MessageLoopObserver(Watcher* watcher) : watcher_(watcher) {
     19     base::MessageLoop::current()->AddDestructionObserver(this);
     20   }
     21 
     22   ~MessageLoopObserver() override {
     23     StopObservingIfNecessary();
     24   }
     25 
     26  private:
     27   // base::MessageLoop::DestructionObserver:
     28   void WillDestroyCurrentMessageLoop() override {
     29     StopObservingIfNecessary();
     30     if (watcher_->IsWatching()) {
     31       // TODO(yzshen): Remove this notification. crbug.com/604762
     32       watcher_->OnHandleReady(MOJO_RESULT_ABORTED);
     33     }
     34   }
     35 
     36   void StopObservingIfNecessary() {
     37     if (is_observing_) {
     38       is_observing_ = false;
     39       base::MessageLoop::current()->RemoveDestructionObserver(this);
     40     }
     41   }
     42 
     43   bool is_observing_ = true;
     44   Watcher* watcher_;
     45 
     46   DISALLOW_COPY_AND_ASSIGN(MessageLoopObserver);
     47 };
     48 
     49 Watcher::Watcher(scoped_refptr<base::SingleThreadTaskRunner> runner)
     50     : task_runner_(std::move(runner)),
     51       is_default_task_runner_(task_runner_ ==
     52                               base::ThreadTaskRunnerHandle::Get()),
     53       weak_factory_(this) {
     54   DCHECK(task_runner_->BelongsToCurrentThread());
     55   weak_self_ = weak_factory_.GetWeakPtr();
     56 }
     57 
     58 Watcher::~Watcher() {
     59   if(IsWatching())
     60     Cancel();
     61 }
     62 
     63 bool Watcher::IsWatching() const {
     64   DCHECK(thread_checker_.CalledOnValidThread());
     65   return handle_.is_valid();
     66 }
     67 
     68 MojoResult Watcher::Start(Handle handle,
     69                           MojoHandleSignals signals,
     70                           const ReadyCallback& callback) {
     71   DCHECK(thread_checker_.CalledOnValidThread());
     72   DCHECK(!IsWatching());
     73   DCHECK(!callback.is_null());
     74 
     75   message_loop_observer_.reset(new MessageLoopObserver(this));
     76   callback_ = callback;
     77   handle_ = handle;
     78   MojoResult result = MojoWatch(handle_.value(), signals,
     79                                 &Watcher::CallOnHandleReady,
     80                                 reinterpret_cast<uintptr_t>(this));
     81   if (result != MOJO_RESULT_OK) {
     82     handle_.set_value(kInvalidHandleValue);
     83     callback_.Reset();
     84     message_loop_observer_.reset();
     85     DCHECK(result == MOJO_RESULT_FAILED_PRECONDITION ||
     86            result == MOJO_RESULT_INVALID_ARGUMENT);
     87     return result;
     88   }
     89 
     90   return MOJO_RESULT_OK;
     91 }
     92 
     93 void Watcher::Cancel() {
     94   DCHECK(thread_checker_.CalledOnValidThread());
     95 
     96   // The watch may have already been cancelled if the handle was closed.
     97   if (!handle_.is_valid())
     98     return;
     99 
    100   MojoResult result =
    101       MojoCancelWatch(handle_.value(), reinterpret_cast<uintptr_t>(this));
    102   message_loop_observer_.reset();
    103   // |result| may be MOJO_RESULT_INVALID_ARGUMENT if |handle_| has closed, but
    104   // OnHandleReady has not yet been called.
    105   DCHECK(result == MOJO_RESULT_INVALID_ARGUMENT || result == MOJO_RESULT_OK);
    106   handle_.set_value(kInvalidHandleValue);
    107   callback_.Reset();
    108 }
    109 
    110 void Watcher::OnHandleReady(MojoResult result) {
    111   DCHECK(thread_checker_.CalledOnValidThread());
    112 
    113   ReadyCallback callback = callback_;
    114   if (result == MOJO_RESULT_CANCELLED) {
    115     message_loop_observer_.reset();
    116     handle_.set_value(kInvalidHandleValue);
    117     callback_.Reset();
    118   }
    119 
    120   // NOTE: It's legal for |callback| to delete |this|.
    121   if (!callback.is_null())
    122     callback.Run(result);
    123 }
    124 
    125 // static
    126 void Watcher::CallOnHandleReady(uintptr_t context,
    127                                 MojoResult result,
    128                                 MojoHandleSignalsState signals_state,
    129                                 MojoWatchNotificationFlags flags) {
    130   // NOTE: It is safe to assume the Watcher still exists because this callback
    131   // will never be run after the Watcher's destructor.
    132   //
    133   // TODO: Maybe we should also expose |signals_state| through the Watcher API.
    134   // Current HandleWatcher users have no need for it, so it's omitted here.
    135   Watcher* watcher = reinterpret_cast<Watcher*>(context);
    136   if ((flags & MOJO_WATCH_NOTIFICATION_FLAG_FROM_SYSTEM) &&
    137       watcher->task_runner_->RunsTasksOnCurrentThread() &&
    138       watcher->is_default_task_runner_) {
    139     // System notifications will trigger from the task runner passed to
    140     // mojo::edk::InitIPCSupport(). In Chrome this happens to always be the
    141     // default task runner for the IO thread.
    142     watcher->OnHandleReady(result);
    143   } else {
    144     watcher->task_runner_->PostTask(
    145         FROM_HERE,
    146         base::Bind(&Watcher::OnHandleReady, watcher->weak_self_, result));
    147   }
    148 }
    149 
    150 }  // namespace mojo
    151