Home | History | Annotate | Download | only in native_messaging
      1 // Copyright 2013 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "remoting/host/native_messaging/native_messaging_reader.h"
      6 
      7 #include <string>
      8 
      9 #include "base/bind.h"
     10 #include "base/files/file.h"
     11 #include "base/json/json_reader.h"
     12 #include "base/location.h"
     13 #include "base/sequenced_task_runner.h"
     14 #include "base/single_thread_task_runner.h"
     15 #include "base/stl_util.h"
     16 #include "base/thread_task_runner_handle.h"
     17 #include "base/threading/sequenced_worker_pool.h"
     18 #include "base/values.h"
     19 
     20 namespace {
     21 
     22 // uint32 is specified in the protocol as the type for the message header.
     23 typedef uint32 MessageLengthType;
     24 
     25 const int kMessageHeaderSize = sizeof(MessageLengthType);
     26 
     27 // Limit the size of received messages, to avoid excessive memory-allocation in
     28 // this process, and potential overflow issues when casting to a signed 32-bit
     29 // int.
     30 const MessageLengthType kMaximumMessageSize = 1024 * 1024;
     31 
     32 }  // namespace
     33 
     34 namespace remoting {
     35 
     36 class NativeMessagingReader::Core {
     37  public:
     38   Core(base::File file,
     39        scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner,
     40        scoped_refptr<base::SequencedTaskRunner> read_task_runner,
     41        base::WeakPtr<NativeMessagingReader> reader_);
     42   ~Core();
     43 
     44   // Reads a message from the Native Messaging client and passes it to
     45   // |message_callback_| on the originating thread. Called on the reader thread.
     46   void ReadMessage();
     47 
     48  private:
     49   // Notify the reader's EOF callback when an error occurs or EOF is reached.
     50   void NotifyEof();
     51 
     52   base::File read_stream_;
     53 
     54   base::WeakPtr<NativeMessagingReader> reader_;
     55 
     56   // Used to post the caller-supplied reader callbacks on the caller thread.
     57   scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_;
     58 
     59   // Used to DCHECK that the reader code executes on the correct thread.
     60   scoped_refptr<base::SequencedTaskRunner> read_task_runner_;
     61 
     62   DISALLOW_COPY_AND_ASSIGN(Core);
     63 };
     64 
     65 NativeMessagingReader::Core::Core(
     66     base::File file,
     67     scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner,
     68     scoped_refptr<base::SequencedTaskRunner> read_task_runner,
     69     base::WeakPtr<NativeMessagingReader> reader)
     70     : read_stream_(file.Pass()),
     71       reader_(reader),
     72       caller_task_runner_(caller_task_runner),
     73       read_task_runner_(read_task_runner) {
     74 }
     75 
     76 NativeMessagingReader::Core::~Core() {}
     77 
     78 void NativeMessagingReader::Core::ReadMessage() {
     79   DCHECK(read_task_runner_->RunsTasksOnCurrentThread());
     80 
     81   // Keep reading messages until the stream is closed or an error occurs.
     82   while (true) {
     83     MessageLengthType message_length;
     84     int read_result = read_stream_.ReadAtCurrentPos(
     85         reinterpret_cast<char*>(&message_length), kMessageHeaderSize);
     86     if (read_result != kMessageHeaderSize) {
     87       // 0 means EOF which is normal and should not be logged as an error.
     88       if (read_result != 0) {
     89         LOG(ERROR) << "Failed to read message header, read returned "
     90                    << read_result;
     91       }
     92       NotifyEof();
     93       return;
     94     }
     95 
     96     if (message_length > kMaximumMessageSize) {
     97       LOG(ERROR) << "Message size too large: " << message_length;
     98       NotifyEof();
     99       return;
    100     }
    101 
    102     std::string message_json(message_length, '\0');
    103     read_result = read_stream_.ReadAtCurrentPos(string_as_array(&message_json),
    104                                                 message_length);
    105     if (read_result != static_cast<int>(message_length)) {
    106       LOG(ERROR) << "Failed to read message body, read returned "
    107                  << read_result;
    108       NotifyEof();
    109       return;
    110     }
    111 
    112     scoped_ptr<base::Value> message(base::JSONReader::Read(message_json));
    113     if (!message) {
    114       LOG(ERROR) << "Failed to parse JSON message: " << message;
    115       NotifyEof();
    116       return;
    117     }
    118 
    119     // Notify callback of new message.
    120     caller_task_runner_->PostTask(
    121         FROM_HERE, base::Bind(&NativeMessagingReader::InvokeMessageCallback,
    122                               reader_, base::Passed(&message)));
    123   }
    124 }
    125 
    126 void NativeMessagingReader::Core::NotifyEof() {
    127   DCHECK(read_task_runner_->RunsTasksOnCurrentThread());
    128   caller_task_runner_->PostTask(
    129       FROM_HERE,
    130       base::Bind(&NativeMessagingReader::InvokeEofCallback, reader_));
    131 }
    132 
    133 NativeMessagingReader::NativeMessagingReader(base::File file)
    134     : reader_thread_("Reader"),
    135       weak_factory_(this) {
    136   reader_thread_.Start();
    137   read_task_runner_ = reader_thread_.message_loop_proxy();
    138   core_.reset(new Core(file.Pass(), base::ThreadTaskRunnerHandle::Get(),
    139                        read_task_runner_, weak_factory_.GetWeakPtr()));
    140 }
    141 
    142 NativeMessagingReader::~NativeMessagingReader() {
    143   read_task_runner_->DeleteSoon(FROM_HERE, core_.release());
    144 }
    145 
    146 void NativeMessagingReader::Start(MessageCallback message_callback,
    147                                   base::Closure eof_callback) {
    148   message_callback_ = message_callback;
    149   eof_callback_ = eof_callback;
    150 
    151   // base::Unretained is safe since |core_| is only deleted via the
    152   // DeleteSoon task which is posted from this class's dtor.
    153   read_task_runner_->PostTask(
    154       FROM_HERE, base::Bind(&NativeMessagingReader::Core::ReadMessage,
    155                             base::Unretained(core_.get())));
    156 }
    157 
    158 void NativeMessagingReader::InvokeMessageCallback(
    159     scoped_ptr<base::Value> message) {
    160   message_callback_.Run(message.Pass());
    161 }
    162 
    163 void NativeMessagingReader::InvokeEofCallback() {
    164   eof_callback_.Run();
    165 }
    166 
    167 }  // namespace remoting
    168