Home | History | Annotate | Download | only in batching_util
      1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/kernels/batching_util/periodic_function.h"
     17 
     18 #include <algorithm>
     19 
     20 #include "tensorflow/core/lib/strings/strcat.h"
     21 #include "tensorflow/core/platform/logging.h"
     22 
     23 namespace tensorflow {
     24 namespace serving {
     25 
     26 PeriodicFunction::PeriodicFunction(const std::function<void()>& function,
     27                                    const int64 interval_micros,
     28                                    const Options& options)
     29     : function_(function),
     30       interval_micros_([interval_micros]() -> int64 {
     31         if (interval_micros < 0) {
     32           const string error = strings::StrCat(
     33               " The value of 'interval_micros' should be >= 0: ",
     34               interval_micros, ". ");
     35           DCHECK(false) << error;
     36           LOG(WARNING) << error << "Resetting it to 0.";
     37           return 0;
     38         }
     39         return interval_micros;
     40       }()),
     41       options_(options) {
     42   thread_.reset(options_.env->StartThread(
     43       options_.thread_options, options_.thread_name_prefix, [this]() {
     44         // Record the starting time here instead of in RunLoop.  That way, if
     45         // there is a delay starting RunLoop, that does not affect the timing
     46         // of
     47         // the first function.  (Such a delay can often happen in tests where
     48         // the test simulates a large time delay immediately after calling
     49         // Start.)
     50         RunLoop(options_.env->NowMicros());
     51       }));
     52 }
     53 
     54 PeriodicFunction::~PeriodicFunction() {
     55   NotifyStop();
     56 
     57   // Waits for thread_ to complete and clean up.
     58   thread_.reset();
     59 }
     60 
     61 void PeriodicFunction::NotifyStop() {
     62   if (!stop_thread_.HasBeenNotified()) {
     63     stop_thread_.Notify();
     64   }
     65 }
     66 
     67 void PeriodicFunction::RunLoop(const int64 start) {
     68   {
     69     if (options_.startup_delay_micros > 0) {
     70       const int64 deadline = start + options_.startup_delay_micros;
     71       options_.env->SleepForMicroseconds(deadline - start);
     72     }
     73 
     74     while (!stop_thread_.HasBeenNotified()) {
     75       VLOG(3) << "Running function.";
     76       const int64 begin = options_.env->NowMicros();
     77       function_();
     78 
     79       // Take the max() here to guard against time going backwards which
     80       // sometimes happens in multiproc machines.
     81       const int64 end =
     82           std::max(static_cast<int64>(options_.env->NowMicros()), begin);
     83 
     84       // The deadline is relative to when the last function started.
     85       const int64 deadline = begin + interval_micros_;
     86 
     87       // We want to sleep until 'deadline'.
     88       if (deadline > end) {
     89         if (end > begin) {
     90           VLOG(3) << "Reducing interval_micros from " << interval_micros_
     91                   << " to " << (deadline - end);
     92         }
     93         options_.env->SleepForMicroseconds(deadline - end);
     94       } else {
     95         VLOG(3) << "Function took longer than interval_micros, so not sleeping";
     96       }
     97     }
     98   }
     99 }
    100 
    101 }  // namespace serving
    102 }  // namespace tensorflow
    103