Home | History | Annotate | Download | only in common_runtime
      1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/common_runtime/step_stats_collector.h"
     17 #include "tensorflow/core/common_runtime/costmodel_manager.h"
     18 #include "tensorflow/core/framework/allocation_description.pb.h"
     19 #include "tensorflow/core/framework/tensor_description.pb.h"
     20 #include "tensorflow/core/framework/tracking_allocator.h"
     21 #include "tensorflow/core/graph/costmodel.h"
     22 #include "tensorflow/core/lib/core/stringpiece.h"
     23 #include "tensorflow/core/lib/strings/numbers.h"
     24 #include "tensorflow/core/lib/strings/scanner.h"
     25 #include "tensorflow/core/platform/logging.h"
     26 
     27 namespace tensorflow {
     28 namespace {
     29 const int kMaxAllocReportNodes = 100;
     30 const float kMaxAllocReportFraction = 0.99;
     31 
     32 struct AllocStats {
     33   std::map<int64, std::vector<string>> nodes_by_size;
     34   int64 total_bytes = 0;
     35   int64 total_nodes = 0;
     36 };
     37 }  // namespace
     38 
     39 NodeExecStatsWrapper::NodeExecStatsWrapper()
     40     : NodeExecStatsWrapper(new NodeExecStats) {}
     41 NodeExecStatsWrapper::NodeExecStatsWrapper(NodeExecStats* stats)
     42     : stats_(stats) {}
     43 
     44 void NodeExecStatsWrapper::AddAllocation(
     45     Allocator* allocator, TrackingAllocator* tracking_allocator) {
     46   AllocatorMemoryUsed* memory = stats_->add_memory();
     47   memory->set_allocator_name(allocator->Name());
     48   auto sizes = tracking_allocator->GetSizes();
     49   memory->set_total_bytes(std::get<0>(sizes));
     50   memory->set_peak_bytes(std::get<1>(sizes));
     51   memory->set_live_bytes(std::get<2>(sizes));
     52 
     53   AllocatorStats stats;
     54   allocator->GetStats(&stats);
     55   memory->set_allocator_bytes_in_use(stats.bytes_in_use);
     56   allocations_.push_back(std::make_pair(memory, tracking_allocator));
     57 }
     58 
     59 void NodeExecStatsWrapper::Finalize() {
     60   for (auto& alloc : allocations_) {
     61     AllocatorMemoryUsed* memory = alloc.first;
     62     for (auto& record : alloc.second->GetRecordsAndUnRef()) {
     63       auto* r = memory->add_allocation_records();
     64       r->set_alloc_bytes(record.alloc_bytes);
     65       r->set_alloc_micros(record.alloc_micros);
     66     }
     67   }
     68   allocations_.clear();
     69 }
     70 
     71 StepStatsCollector::StepStatsCollector(StepStats* ss)
     72     : finalized_(false), step_stats_(ss) {}
     73 
     74 static int ExtractGpuWithStreamAll(string device_name) {
     75   // Check if the device name matches the ".*gpu:(\\d+)/stream:all$" regexp,
     76   // and if it does return the stream index (always positive). If it doesn't
     77   // return -1.
     78 
     79   // The best way to parse this regexp using a scanner is to parse it in
     80   // reverse starting from the end.
     81   std::reverse(device_name.begin(), device_name.end());
     82   strings::Scanner scanner(device_name);
     83   // Check that the string end with '/stream:all'
     84   scanner.OneLiteral("lla:maerts/");
     85   // Capture the digits if present
     86   scanner.RestartCapture().Many(strings::Scanner::DIGIT).StopCapture();
     87   // Check that the digits are preceded by the 'device:GPU:' string
     88   scanner.OneLiteral(":UPG:ecived");
     89   StringPiece capture;
     90   bool matched = scanner.GetResult(nullptr, &capture);
     91 
     92   if (!matched) {
     93     return -1;
     94   } else {
     95     // Convert the captured string into an integer. But first we need to put
     96     // the digits back in order
     97     string ordered_capture = capture.ToString();
     98     std::reverse(ordered_capture.begin(), ordered_capture.end());
     99     int gpu_id;
    100     CHECK(strings::safe_strto32(ordered_capture, &gpu_id));
    101     return gpu_id;
    102   }
    103 }
    104 
    105 static int ExtractGpuWithoutStream(string device_name) {
    106   // Check if the device name matches the ".*gpu:(\\d+)$" regexp,
    107   // and if it does return the stream index (always positive). If it doesn't
    108   // return -1.
    109 
    110   // The best way to parse this regexp using a scanner is to parse it in
    111   // reverse starting from the end.
    112   std::reverse(device_name.begin(), device_name.end());
    113   strings::Scanner scanner(device_name);
    114   // Capture the trailing digits if present
    115   scanner.RestartCapture().Many(strings::Scanner::DIGIT).StopCapture();
    116   // Check that the digits are preceded by the 'device:GPU:' string
    117   scanner.OneLiteral(":UPG:ecived");
    118   StringPiece capture;
    119   bool matched = scanner.GetResult(nullptr, &capture);
    120 
    121   if (!matched) {
    122     return -1;
    123   } else {
    124     // Convert the captured string into an integer. But first we need to put
    125     // the digits back in order
    126     string ordered_capture = capture.ToString();
    127     std::reverse(ordered_capture.begin(), ordered_capture.end());
    128     int gpu_id;
    129     CHECK(strings::safe_strto32(ordered_capture, &gpu_id));
    130     return gpu_id;
    131   }
    132 }
    133 
    134 void StepStatsCollector::BuildCostModel(
    135     CostModelManager* cost_model_manager,
    136     const std::unordered_map<string, const Graph*>& device_map) {
    137   mutex_lock lock(mu_);
    138 
    139   if (!finalized_) {
    140     FinalizeInternal();
    141   }
    142   // Hardware stats for gpu are available under a fake device named
    143   // "gpu:<id>/stream::all.
    144   // Use them instead of regular stats whenever they're available to extract
    145   // the execution stats of a particular node since they're more accurate.
    146   // However hardware traces don't record memory usage, so we still have to
    147   // rely on regular traces to track memory usage.
    148   struct DeviceStats {
    149     const DeviceStepStats* regular_stats;
    150     const DeviceStepStats* hardware_stats;
    151   };
    152 
    153   std::unordered_map<StringPiece, DeviceStats, StringPieceHasher>
    154       per_device_stats;
    155   std::unordered_map<int, const DeviceStepStats*> gpu_hardware_stats;
    156 
    157   for (int i = 0; i < step_stats_->dev_stats_size(); ++i) {
    158     const DeviceStepStats& device_stats = step_stats_->dev_stats(i);
    159     const string& device_name = device_stats.device();
    160     const int gpu_id = ExtractGpuWithStreamAll(device_name);
    161     if (gpu_id >= 0) {
    162       // These are gpu hardware stats
    163       gpu_hardware_stats.emplace(gpu_id, &device_stats);
    164     } else {
    165       // These are regular stats.
    166       per_device_stats.emplace(device_name,
    167                                DeviceStats{&device_stats, nullptr});
    168     }
    169   }
    170 
    171   for (auto& itr : per_device_stats) {
    172     const StringPiece device_name = itr.first;
    173     const int gpu_id = ExtractGpuWithoutStream(device_name.ToString());
    174     if (gpu_id >= 0) {
    175       // Reference the gpu hardware stats in addition to the regular stats
    176       // for this gpu device if they're available.
    177       if (gpu_hardware_stats.find(gpu_id) != gpu_hardware_stats.end()) {
    178         itr.second.hardware_stats = gpu_hardware_stats.find(gpu_id)->second;
    179       }
    180     }
    181   }
    182 
    183   for (auto itr : device_map) {
    184     const StringPiece device = itr.first;
    185     if (per_device_stats.find(device) == per_device_stats.end()) {
    186       continue;
    187     }
    188 
    189     const Graph* graph = itr.second;
    190     CostModel* cm = cost_model_manager->FindOrCreateCostModel(graph);
    191     cm->IncrementUpdateTimes();
    192 
    193     std::unordered_map<StringPiece, Node*, StringPieceHasher> name_to_node;
    194     for (Node* n : graph->nodes()) {
    195       name_to_node.emplace(n->name(), n);
    196     }
    197 
    198     const DeviceStats& dev_stats = per_device_stats.find(device)->second;
    199 
    200     std::unordered_map<string, NodeExecStats> name_to_hw_node_stats;
    201     if (dev_stats.hardware_stats) {
    202       for (const auto& node_stats : dev_stats.hardware_stats->node_stats()) {
    203         string node_name = node_stats.node_name();
    204         // Remove the part of op name (e.g. :Conv2D) in the end of a node name.
    205         size_t pos = node_name.find_first_of(":");
    206         if (pos != std::string::npos) {
    207           node_name = node_name.substr(0, pos);
    208         }
    209         // Certain ops (e.g. Conv2D) are implemented with multiple GPU kernels,
    210         // which results in multiple NodeExecStats with the same node name. For
    211         // such ops, we sum up the time for all its GPU kernels.
    212         if (name_to_hw_node_stats.find(node_name) !=
    213             name_to_hw_node_stats.end()) {
    214           int64 time = name_to_hw_node_stats[node_name].op_end_rel_micros();
    215           name_to_hw_node_stats[node_name].set_op_end_rel_micros(
    216               time + node_stats.op_end_rel_micros());
    217         } else {
    218           name_to_hw_node_stats.emplace(node_name, node_stats);
    219         }
    220       }
    221     }
    222 
    223     for (int i = 0; i < dev_stats.regular_stats->node_stats_size(); ++i) {
    224       const NodeExecStats& stats = dev_stats.regular_stats->node_stats(i);
    225       const Node* node = name_to_node[stats.node_name()];
    226       if (node) {
    227         for (int i = 0; i < stats.output_size(); ++i) {
    228           const auto& output = stats.output(i);
    229           cm->RecordMaxMemorySize(node, i,
    230                                   Bytes(output.tensor_description()
    231                                             .allocation_description()
    232                                             .allocated_bytes()),
    233                                   stats.output(i).tensor_description().shape(),
    234                                   node->output_types()[i]);
    235           cm->RecordAllocationId(node, i,
    236                                  output.tensor_description()
    237                                      .allocation_description()
    238                                      .allocation_id());
    239         }
    240         cm->RecordMemoryStats(node, stats.memory_stats());
    241         // Use hardware stats to record the execution time if they're available,
    242         // otherwise use the regular (less accurate) stats
    243         string node_name = dev_stats.regular_stats->node_stats(i).node_name();
    244         if (dev_stats.hardware_stats && name_to_hw_node_stats.find(node_name) !=
    245                                             name_to_hw_node_stats.end()) {
    246           const NodeExecStats& hw_stats = name_to_hw_node_stats[node_name];
    247           cm->RecordMaxExecutionTime(
    248               node, Microseconds(hw_stats.op_end_rel_micros()));
    249         } else {
    250           cm->RecordMaxExecutionTime(node,
    251                                      Microseconds(stats.op_end_rel_micros()));
    252         }
    253       }
    254     }
    255   }
    256 }
    257 
    258 void StepStatsCollector::Save(const string& device, NodeExecStats* nt) {
    259   Save(device, new NodeExecStatsWrapper(nt));
    260 }
    261 
    262 void StepStatsCollector::Save(const string& device,
    263                               NodeExecStatsWrapper* stats) {
    264   if (!stats) return;
    265   VLOG(1) << "Save dev " << device << " nt " << stats->stats();
    266   {
    267     mutex_lock l(mu_);
    268     if (finalized_) {
    269       LOG(WARNING) << "stats saved after finalize will not be collected.";
    270     }
    271     if (!step_stats_ || collectedNodes >= kMaxCollectedNodes) {
    272       VLOG(1) << "step_stats_ nullptr or already collected too many nodes.";
    273       delete stats;
    274       return;
    275     }
    276     auto& dss = dev_stats_[device];
    277     dss.push_back(std::unique_ptr<NodeExecStatsWrapper>(stats));
    278     collectedNodes++;
    279   }
    280 }
    281 
    282 string StepStatsCollector::ReportAllocsOnResourceExhausted(const string& err) {
    283   mutex_lock l(mu_);
    284   if (err.find("OOM") == err.npos) {
    285     return "";
    286   }
    287   // <device, allocator> -> AllocStats
    288   std::map<std::pair<string, string>, AllocStats> allocs_map;
    289   string report = "\n";
    290   for (const auto& dev_stat : dev_stats_) {
    291     const string& device = dev_stat.first;
    292     // Only print the device that has OOM.
    293     // TODO(xpan): Extract device from err first to speed it up.
    294     if (err.find(device) == err.npos) {
    295       continue;
    296     }
    297     // NodeExecStatsWrapper*
    298     for (const auto& stats : dev_stat.second) {
    299       // std::pair<AllocatorMemoryUsed*, TrackingAllocator*>
    300       for (const auto& alloc : stats->allocations_) {
    301         // Only print the allocator that has OOM.
    302         // TODO(xpan): Extract device from err first to speed it up.
    303         if (err.find(alloc.first->allocator_name()) == err.npos) {
    304           continue;
    305         }
    306         auto dev_allocator =
    307             std::make_pair(dev_stat.first, alloc.first->allocator_name());
    308         AllocStats& dev_allocs_stats = allocs_map[dev_allocator];
    309         TrackingAllocator* tracking_alloc = alloc.second;
    310         gtl::InlinedVector<AllocRecord, 4> cur_records =
    311             tracking_alloc->GetCurrentRecords();
    312         int64 cur_bytes = 0;
    313         for (const auto& r : cur_records) {
    314           cur_bytes += r.alloc_bytes;
    315         }
    316         if (cur_bytes > 0) {
    317           dev_allocs_stats.total_bytes += cur_bytes;
    318           dev_allocs_stats.total_nodes++;
    319           dev_allocs_stats.nodes_by_size[cur_bytes].push_back(
    320               stats->stats()->node_name());
    321         }
    322       }
    323     }
    324   }
    325 
    326   for (const auto& dev_allocs_it : allocs_map) {
    327     const auto& dev = dev_allocs_it.first;
    328     const AllocStats& dev_allocs_stats = dev_allocs_it.second;
    329     int64 reported_bytes = 0;
    330     int64 reported_nodes = 0;
    331     bool done = false;
    332     strings::StrAppend(&report, "\nCurrent usage from device: ", dev.first,
    333                        ", allocator: ", dev.second, "\n");
    334     // Print allocations stats of the <device, allocator> pair.
    335     for (auto it = dev_allocs_stats.nodes_by_size.rbegin();
    336          it != dev_allocs_stats.nodes_by_size.rend(); ++it) {
    337       for (const string& node_name : it->second) {
    338         reported_bytes += it->first;
    339         strings::StrAppend(&report, "  ",
    340                            strings::HumanReadableNumBytes(it->first), " from ",
    341                            node_name, "\n");
    342         if (++reported_nodes > kMaxAllocReportNodes ||
    343             reported_bytes >=
    344                 dev_allocs_stats.total_bytes * kMaxAllocReportFraction) {
    345           done = true;
    346           break;
    347         }
    348       }
    349       if (done) break;
    350     }
    351     int64 remain_nodes = dev_allocs_stats.total_nodes - reported_nodes;
    352     int64 remain_bytes = dev_allocs_stats.total_bytes - reported_bytes;
    353     if (remain_nodes > 0) {
    354       strings::StrAppend(&report, "  Remaining ", remain_nodes, " nodes with ",
    355                          strings::HumanReadableNumBytes(remain_bytes), "\n");
    356     }
    357   }
    358   return report;
    359 }
    360 
    361 void StepStatsCollector::Finalize() {
    362   mutex_lock l(mu_);
    363   FinalizeInternal();
    364 }
    365 
    366 void StepStatsCollector::FinalizeAndSwap(StepStats* ss) {
    367   mutex_lock l(mu_);
    368   CHECK(step_stats_);
    369   FinalizeInternal();
    370   ss->Swap(step_stats_);
    371   collectedNodes = 0;
    372 }
    373 
    374 void StepStatsCollector::FinalizeInternal() {
    375   if (!step_stats_ || finalized_) {
    376     return;
    377   }
    378   finalized_ = true;
    379   std::map<string, DeviceStepStats*> dev_stats_pb;
    380   for (auto& ds : *step_stats_->mutable_dev_stats()) {
    381     dev_stats_pb[ds.device()] = &ds;
    382   }
    383   for (const auto& dev_stat : dev_stats_) {
    384     if (dev_stats_pb.find(dev_stat.first) == dev_stats_pb.end()) {
    385       DeviceStepStats* ndev_stat = step_stats_->add_dev_stats();
    386       ndev_stat->set_device(dev_stat.first);
    387       dev_stats_pb[dev_stat.first] = ndev_stat;
    388     }
    389     DeviceStepStats* dss = dev_stats_pb.at(dev_stat.first);
    390     for (auto& stats : dev_stat.second) {
    391       stats->Finalize();
    392       stats->stats()->Swap(dss->add_node_stats());
    393     }
    394   }
    395 }
    396 }  // namespace tensorflow
    397