Home | History | Annotate | Download | only in files
      1 // Copyright 2016 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/files/file_descriptor_watcher_posix.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/lazy_instance.h"
      9 #include "base/logging.h"
     10 #include "base/memory/ptr_util.h"
     11 #include "base/message_loop/message_loop_current.h"
     12 #include "base/message_loop/message_pump_for_io.h"
     13 #include "base/sequenced_task_runner.h"
     14 #include "base/single_thread_task_runner.h"
     15 #include "base/threading/sequenced_task_runner_handle.h"
     16 #include "base/threading/thread_checker.h"
     17 #include "base/threading/thread_local.h"
     18 
     19 namespace base {
     20 
     21 namespace {
     22 
     23 // MessageLoopForIO used to watch file descriptors for which callbacks are
     24 // registered from a given thread.
     25 LazyInstance<ThreadLocalPointer<MessageLoopForIO>>::Leaky
     26     tls_message_loop_for_io = LAZY_INSTANCE_INITIALIZER;
     27 
     28 }  // namespace
     29 
     30 FileDescriptorWatcher::Controller::~Controller() {
     31   DCHECK(sequence_checker_.CalledOnValidSequence());
     32 
     33   // Delete |watcher_| on the MessageLoopForIO.
     34   //
     35   // If the MessageLoopForIO is deleted before Watcher::StartWatching() runs,
     36   // |watcher_| is leaked. If the MessageLoopForIO is deleted after
     37   // Watcher::StartWatching() runs but before the DeleteSoon task runs,
     38   // |watcher_| is deleted from Watcher::WillDestroyCurrentMessageLoop().
     39   message_loop_for_io_task_runner_->DeleteSoon(FROM_HERE, watcher_.release());
     40 
     41   // Since WeakPtrs are invalidated by the destructor, RunCallback() won't be
     42   // invoked after this returns.
     43 }
     44 
     45 class FileDescriptorWatcher::Controller::Watcher
     46     : public MessagePumpForIO::FdWatcher,
     47       public MessageLoopCurrent::DestructionObserver {
     48  public:
     49   Watcher(WeakPtr<Controller> controller, MessagePumpForIO::Mode mode, int fd);
     50   ~Watcher() override;
     51 
     52   void StartWatching();
     53 
     54  private:
     55   friend class FileDescriptorWatcher;
     56 
     57   // MessagePumpForIO::FdWatcher:
     58   void OnFileCanReadWithoutBlocking(int fd) override;
     59   void OnFileCanWriteWithoutBlocking(int fd) override;
     60 
     61   // MessageLoopCurrent::DestructionObserver:
     62   void WillDestroyCurrentMessageLoop() override;
     63 
     64   // The MessageLoopForIO's watch handle (stops the watch when destroyed).
     65   MessagePumpForIO::FdWatchController fd_watch_controller_;
     66 
     67   // Runs tasks on the sequence on which this was instantiated (i.e. the
     68   // sequence on which the callback must run).
     69   const scoped_refptr<SequencedTaskRunner> callback_task_runner_ =
     70       SequencedTaskRunnerHandle::Get();
     71 
     72   // The Controller that created this Watcher.
     73   WeakPtr<Controller> controller_;
     74 
     75   // Whether this Watcher is notified when |fd_| becomes readable or writable
     76   // without blocking.
     77   const MessagePumpForIO::Mode mode_;
     78 
     79   // The watched file descriptor.
     80   const int fd_;
     81 
     82   // Except for the constructor, every method of this class must run on the same
     83   // MessageLoopForIO thread.
     84   ThreadChecker thread_checker_;
     85 
     86   // Whether this Watcher was registered as a DestructionObserver on the
     87   // MessageLoopForIO thread.
     88   bool registered_as_destruction_observer_ = false;
     89 
     90   DISALLOW_COPY_AND_ASSIGN(Watcher);
     91 };
     92 
     93 FileDescriptorWatcher::Controller::Watcher::Watcher(
     94     WeakPtr<Controller> controller,
     95     MessagePumpForIO::Mode mode,
     96     int fd)
     97     : fd_watch_controller_(FROM_HERE),
     98       controller_(controller),
     99       mode_(mode),
    100       fd_(fd) {
    101   DCHECK(callback_task_runner_);
    102   thread_checker_.DetachFromThread();
    103 }
    104 
    105 FileDescriptorWatcher::Controller::Watcher::~Watcher() {
    106   DCHECK(thread_checker_.CalledOnValidThread());
    107   MessageLoopCurrentForIO::Get()->RemoveDestructionObserver(this);
    108 }
    109 
    110 void FileDescriptorWatcher::Controller::Watcher::StartWatching() {
    111   DCHECK(thread_checker_.CalledOnValidThread());
    112 
    113   if (!MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
    114           fd_, false, mode_, &fd_watch_controller_, this)) {
    115     // TODO(wez): Ideally we would [D]CHECK here, or propagate the failure back
    116     // to the caller, but there is no guarantee that they haven't already
    117     // closed |fd_| on another thread, so the best we can do is Debug-log.
    118     DLOG(ERROR) << "Failed to watch fd=" << fd_;
    119   }
    120 
    121   if (!registered_as_destruction_observer_) {
    122     MessageLoopCurrentForIO::Get()->AddDestructionObserver(this);
    123     registered_as_destruction_observer_ = true;
    124   }
    125 }
    126 
    127 void FileDescriptorWatcher::Controller::Watcher::OnFileCanReadWithoutBlocking(
    128     int fd) {
    129   DCHECK_EQ(fd_, fd);
    130   DCHECK_EQ(MessagePumpForIO::WATCH_READ, mode_);
    131   DCHECK(thread_checker_.CalledOnValidThread());
    132 
    133   // Run the callback on the sequence on which the watch was initiated.
    134   callback_task_runner_->PostTask(
    135       FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
    136 }
    137 
    138 void FileDescriptorWatcher::Controller::Watcher::OnFileCanWriteWithoutBlocking(
    139     int fd) {
    140   DCHECK_EQ(fd_, fd);
    141   DCHECK_EQ(MessagePumpForIO::WATCH_WRITE, mode_);
    142   DCHECK(thread_checker_.CalledOnValidThread());
    143 
    144   // Run the callback on the sequence on which the watch was initiated.
    145   callback_task_runner_->PostTask(
    146       FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
    147 }
    148 
    149 void FileDescriptorWatcher::Controller::Watcher::
    150     WillDestroyCurrentMessageLoop() {
    151   DCHECK(thread_checker_.CalledOnValidThread());
    152 
    153   // A Watcher is owned by a Controller. When the Controller is deleted, it
    154   // transfers ownership of the Watcher to a delete task posted to the
    155   // MessageLoopForIO. If the MessageLoopForIO is deleted before the delete task
    156   // runs, the following line takes care of deleting the Watcher.
    157   delete this;
    158 }
    159 
    160 FileDescriptorWatcher::Controller::Controller(MessagePumpForIO::Mode mode,
    161                                               int fd,
    162                                               const Closure& callback)
    163     : callback_(callback),
    164       message_loop_for_io_task_runner_(
    165           tls_message_loop_for_io.Get().Get()->task_runner()),
    166       weak_factory_(this) {
    167   DCHECK(!callback_.is_null());
    168   DCHECK(message_loop_for_io_task_runner_);
    169   watcher_ = std::make_unique<Watcher>(weak_factory_.GetWeakPtr(), mode, fd);
    170   StartWatching();
    171 }
    172 
    173 void FileDescriptorWatcher::Controller::StartWatching() {
    174   DCHECK(sequence_checker_.CalledOnValidSequence());
    175   // It is safe to use Unretained() below because |watcher_| can only be deleted
    176   // by a delete task posted to |message_loop_for_io_task_runner_| by this
    177   // Controller's destructor. Since this delete task hasn't been posted yet, it
    178   // can't run before the task posted below.
    179   message_loop_for_io_task_runner_->PostTask(
    180       FROM_HERE, BindOnce(&Watcher::StartWatching, Unretained(watcher_.get())));
    181 }
    182 
    183 void FileDescriptorWatcher::Controller::RunCallback() {
    184   DCHECK(sequence_checker_.CalledOnValidSequence());
    185 
    186   WeakPtr<Controller> weak_this = weak_factory_.GetWeakPtr();
    187 
    188   callback_.Run();
    189 
    190   // If |this| wasn't deleted, re-enable the watch.
    191   if (weak_this)
    192     StartWatching();
    193 }
    194 
    195 FileDescriptorWatcher::FileDescriptorWatcher(
    196     MessageLoopForIO* message_loop_for_io) {
    197   DCHECK(message_loop_for_io);
    198   DCHECK(!tls_message_loop_for_io.Get().Get());
    199   tls_message_loop_for_io.Get().Set(message_loop_for_io);
    200 }
    201 
    202 FileDescriptorWatcher::~FileDescriptorWatcher() {
    203   tls_message_loop_for_io.Get().Set(nullptr);
    204 }
    205 
    206 std::unique_ptr<FileDescriptorWatcher::Controller>
    207 FileDescriptorWatcher::WatchReadable(int fd, const Closure& callback) {
    208   return WrapUnique(new Controller(MessagePumpForIO::WATCH_READ, fd, callback));
    209 }
    210 
    211 std::unique_ptr<FileDescriptorWatcher::Controller>
    212 FileDescriptorWatcher::WatchWritable(int fd, const Closure& callback) {
    213   return WrapUnique(
    214       new Controller(MessagePumpForIO::WATCH_WRITE, fd, callback));
    215 }
    216 
    217 }  // namespace base
    218