Home | History | Annotate | Download | only in child
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "content/child/threaded_data_provider.h"
      6 
      7 #include "content/child/child_process.h"
      8 #include "content/child/child_thread.h"
      9 #include "content/child/resource_dispatcher.h"
     10 #include "content/child/thread_safe_sender.h"
     11 #include "content/child/webthread_impl.h"
     12 #include "content/common/resource_messages.h"
     13 #include "ipc/ipc_sync_channel.h"
     14 #include "third_party/WebKit/public/platform/WebThread.h"
     15 #include "third_party/WebKit/public/platform/WebThreadedDataReceiver.h"
     16 
     17 namespace content {
     18 
     19 namespace {
     20 
     21 class DataProviderMessageFilter : public IPC::MessageFilter {
     22  public:
     23   DataProviderMessageFilter(
     24       const scoped_refptr<base::MessageLoopProxy>& io_message_loop,
     25       base::MessageLoop* main_thread_message_loop,
     26       const WebThreadImpl& background_thread,
     27       const base::WeakPtr<ThreadedDataProvider>&
     28           background_thread_resource_provider,
     29       const base::WeakPtr<ThreadedDataProvider>&
     30           main_thread_resource_provider,
     31       int request_id);
     32 
     33   // IPC::ChannelProxy::MessageFilter
     34   virtual void OnFilterAdded(IPC::Sender* sender) OVERRIDE FINAL;
     35   virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE FINAL;
     36 
     37  private:
     38   virtual ~DataProviderMessageFilter() { }
     39 
     40   void OnReceivedData(int request_id, int data_offset, int data_length,
     41                       int encoded_data_length);
     42 
     43   const scoped_refptr<base::MessageLoopProxy> io_message_loop_;
     44   base::MessageLoop* main_thread_message_loop_;
     45   const WebThreadImpl& background_thread_;
     46   // This weakptr can only be dereferenced on the background thread.
     47   base::WeakPtr<ThreadedDataProvider>
     48       background_thread_resource_provider_;
     49   // This weakptr can only be dereferenced on the main thread.
     50   base::WeakPtr<ThreadedDataProvider>
     51       main_thread_resource_provider_;
     52   int request_id_;
     53 };
     54 
     55 DataProviderMessageFilter::DataProviderMessageFilter(
     56     const scoped_refptr<base::MessageLoopProxy>& io_message_loop,
     57     base::MessageLoop* main_thread_message_loop,
     58     const WebThreadImpl& background_thread,
     59     const base::WeakPtr<ThreadedDataProvider>&
     60         background_thread_resource_provider,
     61     const base::WeakPtr<ThreadedDataProvider>&
     62         main_thread_resource_provider,
     63     int request_id)
     64     : io_message_loop_(io_message_loop),
     65       main_thread_message_loop_(main_thread_message_loop),
     66       background_thread_(background_thread),
     67       background_thread_resource_provider_(background_thread_resource_provider),
     68       main_thread_resource_provider_(main_thread_resource_provider),
     69       request_id_(request_id) {
     70   DCHECK(main_thread_message_loop != NULL);
     71 }
     72 
     73 void DataProviderMessageFilter::OnFilterAdded(IPC::Sender* sender) {
     74   DCHECK(io_message_loop_->BelongsToCurrentThread());
     75 
     76   main_thread_message_loop_->PostTask(FROM_HERE,
     77       base::Bind(
     78           &ThreadedDataProvider::OnResourceMessageFilterAddedMainThread,
     79           main_thread_resource_provider_));
     80 }
     81 
     82 bool DataProviderMessageFilter::OnMessageReceived(
     83     const IPC::Message& message) {
     84   DCHECK(io_message_loop_->BelongsToCurrentThread());
     85 
     86   if (message.type() != ResourceMsg_DataReceived::ID)
     87     return false;
     88 
     89   int request_id;
     90 
     91   PickleIterator iter(message);
     92   if (!message.ReadInt(&iter, &request_id)) {
     93     NOTREACHED() << "malformed resource message";
     94     return true;
     95   }
     96 
     97   if (request_id == request_id_) {
     98     ResourceMsg_DataReceived::Schema::Param arg;
     99     if (ResourceMsg_DataReceived::Read(&message, &arg)) {
    100       OnReceivedData(arg.a, arg.b, arg.c, arg.d);
    101       return true;
    102     }
    103   }
    104 
    105   return false;
    106 }
    107 
    108 void DataProviderMessageFilter::OnReceivedData(int request_id,
    109                                                int data_offset,
    110                                                int data_length,
    111                                                int encoded_data_length) {
    112   DCHECK(io_message_loop_->BelongsToCurrentThread());
    113   background_thread_.message_loop()->PostTask(FROM_HERE, base::Bind(
    114       &ThreadedDataProvider::OnReceivedDataOnBackgroundThread,
    115       background_thread_resource_provider_,
    116       data_offset, data_length, encoded_data_length));
    117 }
    118 
    119 }  // anonymous namespace
    120 
    121 ThreadedDataProvider::ThreadedDataProvider(
    122     int request_id, blink::WebThreadedDataReceiver* threaded_data_receiver,
    123     linked_ptr<base::SharedMemory> shm_buffer, int shm_size)
    124     : request_id_(request_id),
    125       shm_buffer_(shm_buffer),
    126       shm_size_(shm_size),
    127       background_thread_(
    128           static_cast<WebThreadImpl&>(
    129               *threaded_data_receiver->backgroundThread())),
    130       ipc_channel_(ChildThread::current()->channel()),
    131       threaded_data_receiver_(threaded_data_receiver),
    132       resource_filter_active_(false),
    133       main_thread_message_loop_(ChildThread::current()->message_loop()),
    134       main_thread_weak_factory_(this) {
    135   DCHECK(ChildThread::current());
    136   DCHECK(ipc_channel_);
    137   DCHECK(threaded_data_receiver_);
    138   DCHECK(main_thread_message_loop_);
    139 
    140   background_thread_weak_factory_.reset(
    141       new base::WeakPtrFactory<ThreadedDataProvider>(this));
    142 
    143   filter_ = new DataProviderMessageFilter(
    144       ChildProcess::current()->io_message_loop_proxy(),
    145       main_thread_message_loop_,
    146       background_thread_,
    147       background_thread_weak_factory_->GetWeakPtr(),
    148       main_thread_weak_factory_.GetWeakPtr(),
    149       request_id);
    150 
    151   ChildThread::current()->channel()->AddFilter(filter_.get());
    152 }
    153 
    154 ThreadedDataProvider::~ThreadedDataProvider() {
    155   DCHECK(ChildThread::current());
    156 
    157   ChildThread::current()->channel()->RemoveFilter(filter_.get());
    158 
    159   delete threaded_data_receiver_;
    160 }
    161 
    162 void DestructOnMainThread(ThreadedDataProvider* data_provider) {
    163   DCHECK(ChildThread::current());
    164 
    165   // The ThreadedDataProvider must be destructed on the main thread to
    166   // be threadsafe when removing the message filter and releasing the shared
    167   // memory buffer.
    168   delete data_provider;
    169 }
    170 
    171 void ThreadedDataProvider::Stop() {
    172   DCHECK(ChildThread::current());
    173 
    174   // Make sure we don't get called by on the main thread anymore via weak
    175   // pointers we've passed to the filter.
    176   main_thread_weak_factory_.InvalidateWeakPtrs();
    177 
    178   blink::WebThread* current_background_thread =
    179       threaded_data_receiver_->backgroundThread();
    180 
    181   // We can't destroy this instance directly; we need to bounce a message over
    182   // to the background thread and back to make sure nothing else will access it
    183   // there, before we can destruct it. We also need to make sure the background
    184   // thread is still alive, since Blink could have shut down at this point
    185   // and freed the thread.
    186   if (current_background_thread) {
    187     // We should never end up with a different parser thread than from when the
    188     // ThreadedDataProvider gets created.
    189     DCHECK(current_background_thread ==
    190         static_cast<WebThreadImpl*>(&background_thread_));
    191     background_thread_.message_loop()->PostTask(FROM_HERE,
    192         base::Bind(&ThreadedDataProvider::StopOnBackgroundThread,
    193                    base::Unretained(this)));
    194   }
    195 }
    196 
    197 void ThreadedDataProvider::StopOnBackgroundThread() {
    198   DCHECK(background_thread_.isCurrentThread());
    199   DCHECK(background_thread_weak_factory_);
    200 
    201   // When this happens, the provider should no longer be called on the
    202   // background thread as it's about to be destroyed on the main thread.
    203   // Destructing the weak pointer factory means invalidating the weak pointers
    204   // which means no callbacks from the filter will happen and nothing else will
    205   // use this instance on the background thread.
    206   background_thread_weak_factory_.reset(NULL);
    207   main_thread_message_loop_->PostTask(FROM_HERE,
    208       base::Bind(&DestructOnMainThread, this));
    209 }
    210 
    211 void ThreadedDataProvider::OnResourceMessageFilterAddedMainThread() {
    212   DCHECK(ChildThread::current());
    213   DCHECK(background_thread_weak_factory_);
    214 
    215   // We bounce this message from the I/O thread via the main thread and then
    216   // to our background thread, following the same path as incoming data before
    217   // our filter gets added, to make sure there's nothing still incoming.
    218   background_thread_.message_loop()->PostTask(FROM_HERE,
    219       base::Bind(
    220           &ThreadedDataProvider::OnResourceMessageFilterAddedBackgroundThread,
    221           background_thread_weak_factory_->GetWeakPtr()));
    222 }
    223 
    224 void ThreadedDataProvider::OnResourceMessageFilterAddedBackgroundThread() {
    225   DCHECK(background_thread_.isCurrentThread());
    226   resource_filter_active_ = true;
    227 
    228   // At this point we know no more data is going to arrive from the main thread,
    229   // so we can process any data we've received directly from the I/O thread
    230   // in the meantime.
    231   if (!queued_data_.empty()) {
    232     std::vector<QueuedSharedMemoryData>::iterator iter = queued_data_.begin();
    233     for (; iter != queued_data_.end(); ++iter) {
    234       ForwardAndACKData(iter->data, iter->length);
    235     }
    236 
    237     queued_data_.clear();
    238   }
    239 }
    240 
    241 void ThreadedDataProvider::OnReceivedDataOnBackgroundThread(
    242     int data_offset, int data_length, int encoded_data_length) {
    243   DCHECK(background_thread_.isCurrentThread());
    244   DCHECK(shm_buffer_ != NULL);
    245 
    246   CHECK_GE(shm_size_, data_offset + data_length);
    247   const char* data_ptr = static_cast<char*>(shm_buffer_->memory());
    248   CHECK(data_ptr);
    249   CHECK(data_ptr + data_offset);
    250 
    251   if (resource_filter_active_) {
    252     ForwardAndACKData(data_ptr + data_offset, data_length);
    253   } else {
    254     // There's a brief interval between the point where we know the filter
    255     // has been installed on the I/O thread, and when we know for sure there's
    256     // no more data coming in from the main thread (from before the filter
    257     // got added). If we get any data during that interval, we need to queue
    258     // it until we're certain we've processed all the main thread data to make
    259     // sure we forward (and ACK) everything in the right order.
    260     QueuedSharedMemoryData queued_data;
    261     queued_data.data = data_ptr + data_offset;
    262     queued_data.length = data_length;
    263     queued_data_.push_back(queued_data);
    264   }
    265 }
    266 
    267 void ThreadedDataProvider::OnReceivedDataOnForegroundThread(
    268     const char* data, int data_length, int encoded_data_length) {
    269   DCHECK(ChildThread::current());
    270 
    271   background_thread_.message_loop()->PostTask(FROM_HERE,
    272       base::Bind(&ThreadedDataProvider::ForwardAndACKData,
    273                  base::Unretained(this),
    274                  data, data_length));
    275 }
    276 
    277 void ThreadedDataProvider::ForwardAndACKData(const char* data,
    278                                              int data_length) {
    279   DCHECK(background_thread_.isCurrentThread());
    280 
    281   // TODO(oysteine): SiteIsolationPolicy needs to be be checked
    282   // here before we pass the data to the data provider
    283   // (or earlier on the I/O thread), otherwise once SiteIsolationPolicy does
    284   // actual blocking as opposed to just UMA logging this will bypass it.
    285   threaded_data_receiver_->acceptData(data, data_length);
    286   ipc_channel_->Send(new ResourceHostMsg_DataReceived_ACK(request_id_));
    287 }
    288 
    289 }  // namespace content
    290