Home | History | Annotate | Download | only in impl
      1 //
      2 // detail/impl/task_io_service.ipp
      3 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
      4 //
      5 // Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
      6 //
      7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
      8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
      9 //
     10 
     11 #ifndef ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
     12 #define ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
     13 
     14 
     15 #include "asio/detail/config.hpp"
     16 
     17 
     18 #include "asio/detail/event.hpp"
     19 #include "asio/detail/limits.hpp"
     20 #include "asio/detail/reactor.hpp"
     21 #include "asio/detail/task_io_service.hpp"
     22 #include "asio/detail/task_io_service_thread_info.hpp"
     23 
     24 #include "asio/detail/push_options.hpp"
     25 
     26 namespace asio {
     27 namespace detail {
     28 
     29 struct task_io_service::task_cleanup
     30 {
     31   ~task_cleanup()
     32   {
     33     if (this_thread_->private_outstanding_work > 0)
     34     {
     35       asio::detail::increment(
     36           task_io_service_->outstanding_work_,
     37           this_thread_->private_outstanding_work);
     38     }
     39     this_thread_->private_outstanding_work = 0;
     40 
     41     // Enqueue the completed operations and reinsert the task at the end of
     42     // the operation queue.
     43     lock_->lock();
     44     task_io_service_->task_interrupted_ = true;
     45     task_io_service_->op_queue_.push(this_thread_->private_op_queue);
     46     task_io_service_->op_queue_.push(&task_io_service_->task_operation_);
     47   }
     48 
     49   task_io_service* task_io_service_;
     50   mutex::scoped_lock* lock_;
     51   thread_info* this_thread_;
     52 };
     53 
     54 struct task_io_service::work_cleanup
     55 {
     56   ~work_cleanup()
     57   {
     58     if (this_thread_->private_outstanding_work > 1)
     59     {
     60       asio::detail::increment(
     61           task_io_service_->outstanding_work_,
     62           this_thread_->private_outstanding_work - 1);
     63     }
     64     else if (this_thread_->private_outstanding_work < 1)
     65     {
     66       task_io_service_->work_finished();
     67     }
     68     this_thread_->private_outstanding_work = 0;
     69 
     70     if (!this_thread_->private_op_queue.empty())
     71     {
     72       lock_->lock();
     73       task_io_service_->op_queue_.push(this_thread_->private_op_queue);
     74     }
     75   }
     76 
     77   task_io_service* task_io_service_;
     78   mutex::scoped_lock* lock_;
     79   thread_info* this_thread_;
     80 };
     81 
     82 task_io_service::task_io_service(
     83     asio::io_service& io_service, std::size_t concurrency_hint)
     84   : asio::detail::service_base<task_io_service>(io_service),
     85     one_thread_(concurrency_hint == 1),
     86     mutex_(),
     87     task_(0),
     88     task_interrupted_(true),
     89     outstanding_work_(0),
     90     stopped_(false),
     91     shutdown_(false)
     92 {
     93   ASIO_HANDLER_TRACKING_INIT;
     94 }
     95 
     96 void task_io_service::shutdown_service()
     97 {
     98   mutex::scoped_lock lock(mutex_);
     99   shutdown_ = true;
    100   lock.unlock();
    101 
    102   // Destroy handler objects.
    103   while (!op_queue_.empty())
    104   {
    105     operation* o = op_queue_.front();
    106     op_queue_.pop();
    107     if (o != &task_operation_)
    108       o->destroy();
    109   }
    110 
    111   // Reset to initial state.
    112   task_ = 0;
    113 }
    114 
    115 void task_io_service::init_task()
    116 {
    117   mutex::scoped_lock lock(mutex_);
    118   if (!shutdown_ && !task_)
    119   {
    120     task_ = &use_service<reactor>(this->get_io_service());
    121     op_queue_.push(&task_operation_);
    122     wake_one_thread_and_unlock(lock);
    123   }
    124 }
    125 
    126 std::size_t task_io_service::run(asio::error_code& ec)
    127 {
    128   ec = asio::error_code();
    129   if (outstanding_work_ == 0)
    130   {
    131     stop();
    132     return 0;
    133   }
    134 
    135   thread_info this_thread;
    136   this_thread.private_outstanding_work = 0;
    137   thread_call_stack::context ctx(this, this_thread);
    138 
    139   mutex::scoped_lock lock(mutex_);
    140 
    141   std::size_t n = 0;
    142   for (; do_run_one(lock, this_thread, ec); lock.lock())
    143     if (n != (std::numeric_limits<std::size_t>::max)())
    144       ++n;
    145   return n;
    146 }
    147 
    148 std::size_t task_io_service::run_one(asio::error_code& ec)
    149 {
    150   ec = asio::error_code();
    151   if (outstanding_work_ == 0)
    152   {
    153     stop();
    154     return 0;
    155   }
    156 
    157   thread_info this_thread;
    158   this_thread.private_outstanding_work = 0;
    159   thread_call_stack::context ctx(this, this_thread);
    160 
    161   mutex::scoped_lock lock(mutex_);
    162 
    163   return do_run_one(lock, this_thread, ec);
    164 }
    165 
    166 std::size_t task_io_service::poll(asio::error_code& ec)
    167 {
    168   ec = asio::error_code();
    169   if (outstanding_work_ == 0)
    170   {
    171     stop();
    172     return 0;
    173   }
    174 
    175   thread_info this_thread;
    176   this_thread.private_outstanding_work = 0;
    177   thread_call_stack::context ctx(this, this_thread);
    178 
    179   mutex::scoped_lock lock(mutex_);
    180 
    181   // We want to support nested calls to poll() and poll_one(), so any handlers
    182   // that are already on a thread-private queue need to be put on to the main
    183   // queue now.
    184   if (one_thread_)
    185     if (thread_info* outer_thread_info = ctx.next_by_key())
    186       op_queue_.push(outer_thread_info->private_op_queue);
    187 
    188   std::size_t n = 0;
    189   for (; do_poll_one(lock, this_thread, ec); lock.lock())
    190     if (n != (std::numeric_limits<std::size_t>::max)())
    191       ++n;
    192   return n;
    193 }
    194 
    195 std::size_t task_io_service::poll_one(asio::error_code& ec)
    196 {
    197   ec = asio::error_code();
    198   if (outstanding_work_ == 0)
    199   {
    200     stop();
    201     return 0;
    202   }
    203 
    204   thread_info this_thread;
    205   this_thread.private_outstanding_work = 0;
    206   thread_call_stack::context ctx(this, this_thread);
    207 
    208   mutex::scoped_lock lock(mutex_);
    209 
    210   // We want to support nested calls to poll() and poll_one(), so any handlers
    211   // that are already on a thread-private queue need to be put on to the main
    212   // queue now.
    213   if (one_thread_)
    214     if (thread_info* outer_thread_info = ctx.next_by_key())
    215       op_queue_.push(outer_thread_info->private_op_queue);
    216 
    217   return do_poll_one(lock, this_thread, ec);
    218 }
    219 
    220 void task_io_service::stop()
    221 {
    222   mutex::scoped_lock lock(mutex_);
    223   stop_all_threads(lock);
    224 }
    225 
    226 bool task_io_service::stopped() const
    227 {
    228   mutex::scoped_lock lock(mutex_);
    229   return stopped_;
    230 }
    231 
    232 void task_io_service::reset()
    233 {
    234   mutex::scoped_lock lock(mutex_);
    235   stopped_ = false;
    236 }
    237 
    238 void task_io_service::post_immediate_completion(
    239     task_io_service::operation* op, bool is_continuation)
    240 {
    241   if (one_thread_ || is_continuation)
    242   {
    243     if (thread_info* this_thread = thread_call_stack::contains(this))
    244     {
    245       ++this_thread->private_outstanding_work;
    246       this_thread->private_op_queue.push(op);
    247       return;
    248     }
    249   }
    250 
    251   work_started();
    252   mutex::scoped_lock lock(mutex_);
    253   op_queue_.push(op);
    254   wake_one_thread_and_unlock(lock);
    255 }
    256 
    257 void task_io_service::post_deferred_completion(task_io_service::operation* op)
    258 {
    259   if (one_thread_)
    260   {
    261     if (thread_info* this_thread = thread_call_stack::contains(this))
    262     {
    263       this_thread->private_op_queue.push(op);
    264       return;
    265     }
    266   }
    267 
    268   mutex::scoped_lock lock(mutex_);
    269   op_queue_.push(op);
    270   wake_one_thread_and_unlock(lock);
    271 }
    272 
    273 void task_io_service::post_deferred_completions(
    274     op_queue<task_io_service::operation>& ops)
    275 {
    276   if (!ops.empty())
    277   {
    278     if (one_thread_)
    279     {
    280       if (thread_info* this_thread = thread_call_stack::contains(this))
    281       {
    282         this_thread->private_op_queue.push(ops);
    283         return;
    284       }
    285     }
    286 
    287     mutex::scoped_lock lock(mutex_);
    288     op_queue_.push(ops);
    289     wake_one_thread_and_unlock(lock);
    290   }
    291 }
    292 
    293 void task_io_service::do_dispatch(
    294     task_io_service::operation* op)
    295 {
    296   work_started();
    297   mutex::scoped_lock lock(mutex_);
    298   op_queue_.push(op);
    299   wake_one_thread_and_unlock(lock);
    300 }
    301 
    302 void task_io_service::abandon_operations(
    303     op_queue<task_io_service::operation>& ops)
    304 {
    305   op_queue<task_io_service::operation> ops2;
    306   ops2.push(ops);
    307 }
    308 
    309 std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
    310     task_io_service::thread_info& this_thread,
    311     const asio::error_code& ec)
    312 {
    313   while (!stopped_)
    314   {
    315     if (!op_queue_.empty())
    316     {
    317       // Prepare to execute first handler from queue.
    318       operation* o = op_queue_.front();
    319       op_queue_.pop();
    320       bool more_handlers = (!op_queue_.empty());
    321 
    322       if (o == &task_operation_)
    323       {
    324         task_interrupted_ = more_handlers;
    325 
    326         if (more_handlers && !one_thread_)
    327           wakeup_event_.unlock_and_signal_one(lock);
    328         else
    329           lock.unlock();
    330 
    331         task_cleanup on_exit = { this, &lock, &this_thread };
    332         (void)on_exit;
    333 
    334         // Run the task. May throw an exception. Only block if the operation
    335         // queue is empty and we're not polling, otherwise we want to return
    336         // as soon as possible.
    337         task_->run(!more_handlers, this_thread.private_op_queue);
    338       }
    339       else
    340       {
    341         std::size_t task_result = o->task_result_;
    342 
    343         if (more_handlers && !one_thread_)
    344           wake_one_thread_and_unlock(lock);
    345         else
    346           lock.unlock();
    347 
    348         // Ensure the count of outstanding work is decremented on block exit.
    349         work_cleanup on_exit = { this, &lock, &this_thread };
    350         (void)on_exit;
    351 
    352         // Complete the operation. May throw an exception. Deletes the object.
    353         o->complete(*this, ec, task_result);
    354 
    355         return 1;
    356       }
    357     }
    358     else
    359     {
    360       wakeup_event_.clear(lock);
    361       wakeup_event_.wait(lock);
    362     }
    363   }
    364 
    365   return 0;
    366 }
    367 
    368 std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
    369     task_io_service::thread_info& this_thread,
    370     const asio::error_code& ec)
    371 {
    372   if (stopped_)
    373     return 0;
    374 
    375   operation* o = op_queue_.front();
    376   if (o == &task_operation_)
    377   {
    378     op_queue_.pop();
    379     lock.unlock();
    380 
    381     {
    382       task_cleanup c = { this, &lock, &this_thread };
    383       (void)c;
    384 
    385       // Run the task. May throw an exception. Only block if the operation
    386       // queue is empty and we're not polling, otherwise we want to return
    387       // as soon as possible.
    388       task_->run(false, this_thread.private_op_queue);
    389     }
    390 
    391     o = op_queue_.front();
    392     if (o == &task_operation_)
    393     {
    394       wakeup_event_.maybe_unlock_and_signal_one(lock);
    395       return 0;
    396     }
    397   }
    398 
    399   if (o == 0)
    400     return 0;
    401 
    402   op_queue_.pop();
    403   bool more_handlers = (!op_queue_.empty());
    404 
    405   std::size_t task_result = o->task_result_;
    406 
    407   if (more_handlers && !one_thread_)
    408     wake_one_thread_and_unlock(lock);
    409   else
    410     lock.unlock();
    411 
    412   // Ensure the count of outstanding work is decremented on block exit.
    413   work_cleanup on_exit = { this, &lock, &this_thread };
    414   (void)on_exit;
    415 
    416   // Complete the operation. May throw an exception. Deletes the object.
    417   o->complete(*this, ec, task_result);
    418 
    419   return 1;
    420 }
    421 
    422 void task_io_service::stop_all_threads(
    423     mutex::scoped_lock& lock)
    424 {
    425   stopped_ = true;
    426   wakeup_event_.signal_all(lock);
    427 
    428   if (!task_interrupted_ && task_)
    429   {
    430     task_interrupted_ = true;
    431     task_->interrupt();
    432   }
    433 }
    434 
    435 void task_io_service::wake_one_thread_and_unlock(
    436     mutex::scoped_lock& lock)
    437 {
    438   if (!wakeup_event_.maybe_unlock_and_signal_one(lock))
    439   {
    440     if (!task_interrupted_ && task_)
    441     {
    442       task_interrupted_ = true;
    443       task_->interrupt();
    444     }
    445     lock.unlock();
    446   }
    447 }
    448 
    449 } // namespace detail
    450 } // namespace asio
    451 
    452 #include "asio/detail/pop_options.hpp"
    453 
    454 
    455 #endif // ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
    456